Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional handle_enqueue/2 and handle_dequeue/2 callbacks #7

Merged
merged 13 commits into from
Apr 28, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 96 additions & 53 deletions lib/nimble_pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,28 @@ defmodule NimblePool do
@callback handle_info(message :: term, worker_state) ::
{:ok, worker_state} | {:remove, user_reason}

@doc """
Executed by the pool, whenever a request to checkout a worker is enqueued.

The implementation should be very fast, as it will block the pool.
Useful for tracking load on the pool. It must return the pool_state.

This callback is optional.
"""
@callback handle_enqueue(command :: term, pool_state) ::
{:ok, maybe_wrapped_command :: term, pool_state} | {:skip, pool_state}

@doc """
Executed by the pool, whenever a request to checkout a worker is dequeued.

The implementation should be very fast, as it will block the pool.
Useful for tracking load on the pool. It must return the pool_state.

This callback is optional.
"""
@callback handle_dequeue(maybe_wrapped_command :: term, pool_state) ::
{:ok, command :: term, pool_state} | {:skip, pool_state}

@doc """
Terminates a worker.

Expand Down Expand Up @@ -117,7 +139,12 @@ defmodule NimblePool do
) ::
{:ok, pool_state}

@optional_callbacks init_pool: 1, handle_checkin: 3, handle_info: 2, terminate_worker: 3
@optional_callbacks init_pool: 1,
handle_checkin: 3,
handle_info: 2,
handle_enqueue: 2,
handle_dequeue: 2,
terminate_worker: 3

@doc """
Defines a pool to be started under the supervision tree.
Expand Down Expand Up @@ -199,7 +226,7 @@ defmodule NimblePool do
end

ref = Process.monitor(pid)
send_call(pid, ref, {:checkout, command, deadline(timeout)})
send_call(pid, ref, {:checkout, command})

receive do
{^ref, client_state} ->
Expand All @@ -209,7 +236,7 @@ defmodule NimblePool do
function.({pid, ref}, client_state)
catch
kind, reason ->
send(pid, {__MODULE__, :cancel, ref, kind})
send_cancel(pid, ref, kind)
:erlang.raise(kind, reason, __STACKTRACE__)
else
{result, client_state} ->
Expand All @@ -224,11 +251,16 @@ defmodule NimblePool do
exit!(reason, :checkout, [pool])
after
timeout ->
Process.demonitor(ref, [:flush])
send_cancel(pid, ref, :timeout)
exit!(:timeout, :checkout, [pool])
end
end

defp send_cancel(pid, ref, reason) do
send(pid, {__MODULE__, :cancel, ref, reason})
Process.demonitor(ref, [:flush])
end

@doc """
Pre-checks the given `worker_state` in.

Expand All @@ -242,12 +274,6 @@ defmodule NimblePool do
send(pid, {__MODULE__, :precheckin, ref, worker_state})
end

defp deadline(timeout) when is_integer(timeout) do
System.monotonic_time() + System.convert_time_unit(timeout, :millisecond, :native)
end

defp deadline(:infinity), do: :infinity

defp get_node({_, node}), do: node
defp get_node(pid) when is_pid(pid), do: node(pid)

Expand Down Expand Up @@ -290,13 +316,19 @@ defmodule NimblePool do
end

@impl true
def handle_call({:checkout, command, deadline}, {pid, ref} = from, state) do
%{requests: requests, monitors: monitors} = state
mon_ref = Process.monitor(pid)
requests = Map.put(requests, ref, {pid, mon_ref, :command, command, deadline})
monitors = Map.put(monitors, mon_ref, ref)
state = %{state | requests: requests, monitors: monitors}
{:noreply, maybe_checkout(command, mon_ref, deadline, from, state)}
def handle_call({:checkout, command}, {pid, ref} = from, state) do
case handle_enqueue(command, state) do
{:ok, command, state} ->
%{requests: requests, monitors: monitors} = state
mon_ref = Process.monitor(pid)
requests = Map.put(requests, ref, {pid, mon_ref, :command, command})
monitors = Map.put(monitors, mon_ref, ref)
state = %{state | requests: requests, monitors: monitors}
{:noreply, maybe_checkout(command, mon_ref, from, state)}

{:skip, state} ->
{:noreply, state}
end
end

@impl true
Expand Down Expand Up @@ -439,7 +471,7 @@ defmodule NimblePool do

case requests do
# Exited or timed out before we could serve it
%{^ref => {_, mon_ref, :command, _, _}} ->
%{^ref => {_, mon_ref, :command, _command}} ->
Process.demonitor(mon_ref, [:flush])
monitors = Map.delete(monitors, mon_ref)
requests = Map.delete(requests, ref)
Expand Down Expand Up @@ -486,8 +518,8 @@ defmodule NimblePool do
{{:value, {pid, ref}}, queue} ->
case requests do
# The request still exists, so we are good to go
%{^ref => {^pid, mon_ref, :command, command, deadline}} ->
maybe_checkout(command, mon_ref, deadline, {pid, ref}, %{state | queue: queue})
%{^ref => {^pid, mon_ref, :command, command}} ->
maybe_checkout(command, mon_ref, {pid, ref}, %{state | queue: queue})

# It should never happen
%{^ref => _} ->
Expand All @@ -503,50 +535,45 @@ defmodule NimblePool do
end
end

defp maybe_checkout(command, mon_ref, deadline, {pid, ref} = from, state) do
defp maybe_checkout(command, mon_ref, {pid, ref} = from, state) do
%{resources: resources, requests: requests, worker: worker, queue: queue} = state

if past_deadline?(deadline) do
requests = Map.delete(state.requests, ref)
monitors = Map.delete(state.monitors, mon_ref)
Process.demonitor(mon_ref, [:flush])
maybe_checkout(%{state | requests: requests, monitors: monitors})
else
case :queue.out(resources) do
{{:value, worker_server_state}, resources} ->
args = [command, from, worker_server_state]
with {:ok, state} <- handle_dequeue(command, state),
{{:value, worker_server_state}, resources} <- :queue.out(resources) do
args = [command, from, worker_server_state]

case apply_worker_callback(worker, :handle_checkout, args) do
{:ok, worker_client_state, worker_server_state} ->
GenServer.reply({pid, ref}, worker_client_state)
requests = Map.put(requests, ref, {pid, mon_ref, :state, worker_server_state})
%{state | resources: resources, requests: requests}
case apply_worker_callback(worker, :handle_checkout, args) do
{:ok, worker_client_state, worker_server_state} ->
GenServer.reply({pid, ref}, worker_client_state)

{:remove, reason} ->
state = remove(reason, worker_server_state, state)
maybe_checkout(command, mon_ref, deadline, from, %{state | resources: resources})
requests = Map.put(requests, ref, {pid, mon_ref, :state, worker_server_state})

other ->
raise """
unexpected return from #{inspect(worker)}.handle_checkout/3.
%{state | resources: resources, requests: requests}

Expected: {:ok, client_state, server_state} | {:remove, reason}
Got: #{inspect(other)}
"""
end
{:remove, reason} ->
state = remove(reason, worker_server_state, state)
maybe_checkout(command, mon_ref, from, %{state | resources: resources})

other ->
raise """
unexpected return from #{inspect(worker)}.handle_checkout/3.

{:empty, _} ->
%{state | queue: :queue.in(from, queue)}
Expected: {:ok, client_state, server_state} | {:remove, reason}
Got: #{inspect(other)}
"""
end
end
end
else
{:skip, state} ->
requests = Map.delete(state.requests, ref)
monitors = Map.delete(state.monitors, mon_ref)
Process.demonitor(mon_ref, [:flush])
%{state | requests: requests, monitors: monitors}

defp past_deadline?(deadline) when is_integer(deadline) do
System.monotonic_time() >= deadline
{:empty, _} ->
%{state | queue: :queue.in(from, queue)}
end
end

defp past_deadline?(_), do: false

defp remove(reason, worker_server_state, state) do
state = maybe_terminate_worker(reason, worker_server_state, state)
schedule_init()
Expand Down Expand Up @@ -634,4 +661,20 @@ defmodule NimblePool do

defp crash_reason(:throw, value), do: {:nocatch, value}
defp crash_reason(_, value), do: value

defp handle_enqueue(command, %{worker: worker} = pool_state) do
if function_exported?(worker, :handle_enqueue, 2) do
apply_worker_callback(worker, :handle_enqueue, [command, pool_state])
else
{:ok, command, pool_state}
end
end

defp handle_dequeue(command, %{worker: worker} = pool_state) do
if function_exported?(worker, :handle_dequeue, 2) do
apply_worker_callback(worker, :handle_dequeue, [command, pool_state])
else
{:ok, pool_state}
sneako marked this conversation as resolved.
Show resolved Hide resolved
end
end
end
45 changes: 41 additions & 4 deletions test/nimble_pool_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ defmodule NimblePoolTest do
assert_receive {:terminate, :shutdown}
end

test "does not restart worker on client timeout during checkout" do
test "restarts worker on client timeout during checkout" do
parent = self()

pool =
Expand Down Expand Up @@ -290,15 +290,19 @@ defmodule NimblePoolTest do

:sys.resume(pool)

# Terminated and restarted
assert_receive {:terminate, :timeout}
assert_receive :started
refute_received :started

# Do a proper checkout now
assert NimblePool.checkout!(pool, :checkout, fn _ref, :client_state_out ->
{:result, :client_state_in}
end) == :result

# Did not have to start a new worker after the previous timeout
refute_received :started

# Assert down from failed checkout! did not leak
NimblePool.stop(pool, :shutdown)
refute_received {:DOWN, _, _, _, _}
assert_receive {:terminate, :shutdown}
end

Expand Down Expand Up @@ -955,4 +959,37 @@ defmodule NimblePoolTest do
assert_drained agent
end
end

describe "handle_enqueue & handle_dequeue" do
defmodule StatelessPoolWithHandleEnqueueAndDequeue do
@behaviour NimblePool

def init_worker(from), do: {:ok, from, from}

def handle_checkout({:wrapped, _command}, from, worker_state) do
{:ok, from, worker_state}
end

def handle_enqueue(command, %{state: {pid, ref}} = state) do
send(pid, {ref, :enqueued})
{:ok, {:wrapped, command}, state}
end

def handle_dequeue({:wrapped, _command}, %{state: {pid, ref}} = state) do
send(pid, {ref, :dequeued})
{:ok, state}
end
end

test "executes handle_enqueue and handle_dequeue callbacks when defined" do
parent = self()
ref = make_ref()
pool = start_pool!(StatelessPoolWithHandleEnqueueAndDequeue, {parent, ref}, [])

NimblePool.checkout!(pool, :checkout, fn _, _ -> {:ok, :ok} end)

assert_receive {^ref, :enqueued}
assert_receive {^ref, :dequeued}
end
end
end