Skip to content

Commit

Permalink
reply with a :skipped message to the client when handle_enqueue/deque…
Browse files Browse the repository at this point in the history
…ue skip a checkout
  • Loading branch information
sneako committed Apr 27, 2020
1 parent 2a0cf7d commit 898850b
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 19 deletions.
7 changes: 6 additions & 1 deletion lib/nimble_pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ defmodule NimblePool do
send_call(pid, ref, {:checkout, command})

receive do
{^ref, :skipped} ->
Process.demonitor(ref, [:flush])
exit!(:skipped, :checkout, [pool])

{^ref, client_state} ->
Process.demonitor(ref, [:flush])

Expand Down Expand Up @@ -327,7 +331,7 @@ defmodule NimblePool do
{:noreply, maybe_checkout(command, mon_ref, from, state)}

{:skip, state} ->
{:noreply, state}
{:reply, :skipped, state}
end
end

Expand Down Expand Up @@ -567,6 +571,7 @@ defmodule NimblePool do
requests = Map.delete(state.requests, ref)
monitors = Map.delete(state.monitors, mon_ref)
Process.demonitor(mon_ref, [:flush])
GenServer.reply({pid, ref}, :skipped)
%{state | requests: requests, monitors: monitors}

{:empty, _} ->
Expand Down
104 changes: 86 additions & 18 deletions test/nimble_pool_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -961,35 +961,103 @@ defmodule NimblePoolTest do
end

describe "handle_enqueue & handle_dequeue" do
defmodule StatelessPoolWithHandleEnqueueAndDequeue do
@behaviour NimblePool
test "executes handle_enqueue and handle_dequeue callbacks when defined" do
defmodule PoolWithHandleEnqueueAndDequeue do
@behaviour NimblePool

def init_worker(from), do: {:ok, from, from}
def init_worker(parent) do
{:ok, parent, parent}
end

def handle_checkout({:wrapped, _command}, from, worker_state) do
{:ok, from, worker_state}
end
def handle_checkout(_command, _from, parent) do
{:ok, parent, parent}
end

def handle_enqueue(command, pool_state) do
send(pool_state.state, :enqueued)
{:ok, command, pool_state}
end

def handle_enqueue(command, %{state: {pid, ref}} = state) do
send(pid, {ref, :enqueued})
{:ok, {:wrapped, command}, state}
def handle_dequeue(command, pool_state) do
send(pool_state.state, :dequeued)
{:ok, command, pool_state}
end
end

def handle_dequeue({:wrapped, _command}, %{state: {pid, ref}} = state) do
send(pid, {ref, :dequeued})
{:ok, state}
parent = self()
pool = start_pool!(PoolWithHandleEnqueueAndDequeue, parent, [])

NimblePool.checkout!(pool, :checkout, fn _, _ -> {:ok, :ok} end)

assert_receive :enqueued
assert_receive :dequeued
end

test "checkout will not be called and worker will not restart if handle_enqueue returns :skip tuple" do
defmodule PoolThatSkipsOnEnqueue do
@behaviour NimblePool

def init_worker(parent) do
send(parent, :init_worker)
{:ok, parent, parent}
end

def handle_checkout(_command, _from, worker_state) do
flunk("handle_checkout/3 should be skipped")
{:ok, worker_state, worker_state}
end

def handle_enqueue(_command, parent) do
{:skip, parent}
end
end

parent = self()
pool = start_pool!(PoolThatSkipsOnEnqueue, parent, [])

assert_receive :init_worker

assert catch_exit(
NimblePool.checkout!(pool, :checkout, fn _ref, state -> {state, state} end)
) ==
{:skipped, {NimblePool, :checkout, [pool]}}

refute_receive :init_worker
end

test "executes handle_enqueue and handle_dequeue callbacks when defined" do
test "checkout will not be called and worker will not restart if handle_dequeue returns :skip tuple" do
defmodule PoolThatSkipsOnDequeue do
@behaviour NimblePool

def init_worker(parent) do
send(parent, :init_worker)
{:ok, parent, parent}
end

def handle_checkout(_command, _from, worker_state) do
flunk("handle_checkout/3 should be skipped")
{:ok, worker_state, worker_state}
end

def handle_enqueue(command, parent) do
{:ok, {:wrapped, command}, parent}
end

def handle_dequeue({:wrapped, _command}, parent) do
{:skip, parent}
end
end

parent = self()
ref = make_ref()
pool = start_pool!(StatelessPoolWithHandleEnqueueAndDequeue, {parent, ref}, [])
pool = start_pool!(PoolThatSkipsOnDequeue, parent, [])
assert_receive :init_worker

NimblePool.checkout!(pool, :checkout, fn _, _ -> {:ok, :ok} end)
assert catch_exit(
NimblePool.checkout!(pool, :checkout, fn _ref, state -> {state, state} end)
) ==
{:skipped, {NimblePool, :checkout, [pool]}}

assert_receive {^ref, :enqueued}
assert_receive {^ref, :dequeued}
refute_receive :init_worker
end
end
end

0 comments on commit 898850b

Please sign in to comment.