Skip to content

Commit

Permalink
Add actors
Browse files Browse the repository at this point in the history
  • Loading branch information
vegansk committed Mar 28, 2017
1 parent 8d4dee9 commit 80e9098
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 2 deletions.
3 changes: 3 additions & 0 deletions config.nims
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ task test, "Run all tests":

task test_either, "Run Either tests":
test "either"

task test_concurrent, "Run concurrent tests":
test "concurrent"
4 changes: 2 additions & 2 deletions nimfp.nimble
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
# Package
version = "0.3.8"
version = "0.4.0"
author = "Anatoly Galiulin <[email protected]>"
description = "Nim functional programming library"
license = "MIT"

srcDir = "src"

# Deps
requires "nim >= 0.14.3", "nimboost >= 0.3.2", "classy >= 0.0.2"
requires "nim >= 0.14.3", "nimboost >= 0.4.5", "classy >= 0.0.2"
2 changes: 2 additions & 0 deletions src/fp.nim
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import fp.option,
fp.either,
fp.trym,
fp.futurem,
fp.concurrent,
fp.map,
fp.function,
fp.stream,
Expand All @@ -17,6 +18,7 @@ export option,
either,
trym,
futurem,
concurrent,
map,
function,
stream,
Expand Down
115 changes: 115 additions & 0 deletions src/fp/concurrent.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import boost.types,
boost.typeutils,
threadpool,
fp.futurem,
future,
sharedlist

type Strategy*[A] = proc(a: () -> A): () -> A

proc sequentalStrategy*[A](a: () -> A): () -> A {.procvar.} =
let r = a()
result = () => r

proc spawnStrategy*[A](a: proc(): A {.gcsafe.}): () -> A {.procvar.} =
let r = spawn a()
result = proc(): auto =
return ^r

proc asyncStrategy*[A](a: () -> A): () -> A {.procvar.} =
let f = newFuture[A](a)
result = proc(): auto =
f.run.get

#TODO: Implement actor without Channels

type Handler*[A] = proc(v: A) {.gcsafe.}
type HandlerS*[S,A] = proc(s: S, v: A): S {.gcsafe.}
type ErrorHandler* = proc(e: ref Exception): void

let rethrowError*: ErrorHandler = proc(e: ref Exception) =
raise e

type Actor*[A] = ref object
strategy: Strategy[Unit]
handler: Handler[A]
onError: ErrorHandler
messages: Channel[A]
activeReader: bool

proc releaseActor[A](a: Actor[A]) =
a.messages.close

proc newActor*[A](strategy: Strategy[Unit], handler: Handler[A], onError = rethrowError): Actor[A] =
new(result, releaseActor[A])
result.strategy = strategy
result.handler = handler
result.onError = onError
result.messages.open
result.activeReader = false

proc processQueue[A](a: ptr Actor[A]) =
while true:
let (hasMsg, msg) = a[].messages.tryRecv()
if not hasMsg:
discard cas(a[].activeReader.addr, true, false)
break
try:
a[].handler(msg)
except:
a[].onError(getCurrentException())

proc send*[A](a: Actor[A]|ptr Actor[A], v: A) =
let ap = when a is ptr: a else: a.unsafeAddr
ap[].messages.send(v)
if cas(ap[].activeReader.addr, false, true):
discard a.strategy(() => (ap.processQueue(); ()))()

proc `!`*[A](a: Actor[A]|ptr Actor[A], v: A) =
a.send(v)

type ActorS*[S,A] = ref object
strategy: Strategy[Unit]
handler: HandlerS[S,A]
onError: ErrorHandler
messages: Channel[A]
state: Channel[S]
activeReader: bool

proc releaseActorS[S,A](a: ActorS[S,A]) =
a.state.close
a.messages.close

proc newActorS*[S,A](strategy: Strategy[Unit], initialState: S, handler: HandlerS[S,A], onError = rethrowError): ActorS[S,A] =
new(result, releaseActorS[S,A])
result.strategy = strategy
result.handler = handler
result.onError = onError
result.messages.open
result.state.open
result.state.send(initialState)
result.activeReader = false

proc processQueue[S,A](a: ptr ActorS[S,A]) =
while true:
let (hasMsg, msg) = a[].messages.tryRecv()
if not hasMsg:
discard cas(a[].activeReader.addr, true, false)
break
let (hasState, state) = a[].state.tryRecv()
assert hasState
try:
a[].state.send(a[].handler(state, msg))
except:
a[].onError(getCurrentException())
# Set old state
a[].state.send(state)

proc send*[S,A](a: ActorS[S,A]|ptr ActorS[S,A], v: A) =
let ap = when a is ptr: a else: a.unsafeAddr
ap[].messages.send(v)
if cas(ap[].activeReader.addr, false, true):
discard a.strategy(() => (ap.processQueue(); ()))()

proc `!`*[S,A](a: ActorS[S,A]|ptr ActorS[S,A], v: A) =
a.send(v)
1 change: 1 addition & 0 deletions tests/fp/test_all.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ import fp,
./test_futurem,
./test_mtransf,
./test_iterable,
./test_concurrent,

./std/test_jsonops
28 changes: 28 additions & 0 deletions tests/fp/test_concurrent.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import future,
fp,
unittest

suite "Concurrent":

test "Strategy":
check sequentalStrategy(() => 1)() == 1
check spawnStrategy(() => 1)() == 1
check asyncStrategy(() => 1)() == 1

test "Actor":
let actor = newActor[string](spawnStrategy) do(v: string) -> void:
echo v
actor ! "Hello, world!"
actor.unsafeAddr ! "Hello, world 2!"

test "Actor with state":
let actor = newActorS[string, string](spawnStrategy, "") do(state, v: string) -> string:
result = state
if v == "end":
echo state
else:
result &= v & "\n"
actor ! "Hello, world!"
actor.unsafeAddr ! "Hello, world 2!"
actor.unsafeAddr ! "end"

0 comments on commit 80e9098

Please sign in to comment.