Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional handle_enqueue/2 and handle_dequeue/2 callbacks #7

Merged
merged 13 commits into from
Apr 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 112 additions & 54 deletions lib/nimble_pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,32 +49,41 @@ defmodule NimblePool do
@doc """
Checks a worker out.

It receives the `command`, given to on `checkout!/4` and it must
return either `{:ok, client_state, worker_state}` or `{:remove, reason}`.
It receives `maybe_wrapped_command`. The `command` is given to the `checkout!/4`
call and may optionally be wrapped by `c:handle_enqueue/2`. It must return either
`{:ok, client_state, worker_state}`, `{:remove, reason, pool_state}`, or
`{:skip, Exception.t(), pool_state}`.

If `:remove` is returned, `NimblePool` will attempt to checkout another
worker.

If `:skip` is returned, `NimblePool` will skip the checkout, the client will
raise the returned exception, and the worker will be left ready for the next
checkout attempt.

Note this callback is synchronous and therefore will block the pool.
Avoid performing long work in here, instead do as much work as
possible on the client.
"""
@callback handle_checkout(command :: term, from, worker_state) ::
{:ok, client_state, worker_state} | {:remove, user_reason}
@callback handle_checkout(maybe_wrapped_command :: term, from, worker_state, pool_state) ::
{:ok, client_state, worker_state, pool_state}
| {:remove, user_reason, pool_state}
| {:skip, Exception.t(), pool_state}

@doc """
Checks a worker in.

It receives the `client_state`, returned by the `checkout!/4` anonymous
function and it must return either `{:ok, worker_state}` or `{:remove, reason}`.
function and it must return either `{:ok, worker_state}` or `{:remove, reason, pool_state}`.

Note this callback is synchronous and therefore will block the pool.
Avoid performing long work in here, instead do as much work as
possible on the client.

This callback is optional.
"""
@callback handle_checkin(client_state, from, worker_state) ::
{:ok, worker_state} | {:remove, user_reason}
@callback handle_checkin(client_state, from, worker_state, pool_state) ::
{:ok, worker_state, pool_state} | {:remove, user_reason, pool_state}

@doc """
Receives a message in the worker.
Expand All @@ -90,6 +99,24 @@ defmodule NimblePool do
@callback handle_info(message :: term, worker_state) ::
{:ok, worker_state} | {:remove, user_reason}

@doc """
Executed by the pool, whenever a request to checkout a worker is enqueued.

The `command` argument should be treated as an opaque value, but it can be
wrapped with some data to be used in `c:handle_checkout/4`.

It must return either `{:ok, maybe_wrapped_command, pool_state}` or
`{:skip, Exception.t(), pool_state}` if checkout is to be skipped.

Note this callback is synchronous and therefore will block the pool.
Avoid performing long work in here.

This callback is optional.
"""
@callback handle_enqueue(command :: term, pool_state) ::
{:ok, maybe_wrapped_command :: term, pool_state}
| {:skip, Exception.t(), pool_state}

@doc """
Terminates a worker.

Expand Down Expand Up @@ -117,7 +144,11 @@ defmodule NimblePool do
) ::
{:ok, pool_state}

@optional_callbacks init_pool: 1, handle_checkin: 3, handle_info: 2, terminate_worker: 3
@optional_callbacks init_pool: 1,
handle_checkin: 4,
handle_info: 2,
handle_enqueue: 2,
terminate_worker: 3

@doc """
Defines a pool to be started under the supervision tree.
Expand Down Expand Up @@ -178,14 +209,14 @@ defmodule NimblePool do
@doc """
Checks out from the pool.

It expects a command, which will be passed to the `c:handle_checkout/3`
callback. The `c:handle_checkout/3` callback will return a client state,
It expects a command, which will be passed to the `c:handle_checkout/4`
callback. The `c:handle_checkout/4` callback will return a client state,
which is given to the `function`.

The `function` receives two arguments, the pool reference and must return
a two-element tuple, where the first element is the function return value,
and the second element is the updated `client_state`, which will be given
as the first argument to `c:handle_checkin/3`.
as the first argument to `c:handle_checkin/4`.

`checkout!` also has an optional `timeout` value, this value will be applied
to checkout operation itself. `checkin` happens asynchronously.
Expand All @@ -202,6 +233,9 @@ defmodule NimblePool do
send_call(pid, ref, {:checkout, command, deadline(timeout)})

receive do
{^ref, {:skipped, exception}} ->
raise exception

{^ref, client_state} ->
Process.demonitor(ref, [:flush])

Expand Down Expand Up @@ -296,7 +330,15 @@ defmodule NimblePool do
requests = Map.put(requests, ref, {pid, mon_ref, :command, command, deadline})
monitors = Map.put(monitors, mon_ref, ref)
state = %{state | requests: requests, monitors: monitors}
{:noreply, maybe_checkout(command, mon_ref, deadline, from, state)}

case handle_enqueue(command, state) do
{:ok, command, state} ->
{:noreply, maybe_checkout(command, mon_ref, deadline, from, state)}

{:skip, exception, state} ->
state = remove_request(state, ref, mon_ref)
{:reply, {:skipped, exception}, state}
end
end

@impl true
Expand All @@ -315,33 +357,29 @@ defmodule NimblePool do

@impl true
def handle_info({__MODULE__, :checkin, ref, worker_client_state}, state) do
%{requests: requests, resources: resources, worker: worker, monitors: monitors} = state
%{requests: requests, resources: resources, worker: worker} = state

case requests do
%{^ref => {pid, mon_ref, :state, worker_server_state}} ->
checkin =
if function_exported?(worker, :handle_checkin, 3) do
args = [worker_client_state, {pid, ref}, worker_server_state]
apply_worker_callback(worker, :handle_checkin, args)
if function_exported?(worker, :handle_checkin, 4) do
args = [worker_client_state, {pid, ref}, worker_server_state, state]
apply_worker_callback_with_state(state, :handle_checkin, args)
else
{:ok, worker_server_state}
{:ok, worker_server_state, state}
end

{resources, state} =
case checkin do
{:ok, worker_server_state} ->
{:ok, worker_server_state, state} ->
{:queue.in(worker_server_state, resources), state}

{:remove, reason} ->
{resources, remove(reason, worker_server_state, state)}
{:remove, reason, state} ->
{resources, remove_worker(reason, worker_server_state, state)}
end

Process.demonitor(mon_ref, [:flush])
monitors = Map.delete(monitors, mon_ref)
requests = Map.delete(requests, ref)

state = %{state | requests: requests, monitors: monitors, resources: resources}
{:noreply, maybe_checkout(state)}
state = remove_request(state, ref, mon_ref)
{:noreply, maybe_checkout(%{state | resources: resources})}

%{} ->
exit(:unexpected_checkin)
Expand Down Expand Up @@ -434,25 +472,16 @@ defmodule NimblePool do
{:noreply, %{state | resources: resources, async: async, state: pool_state}}
end

defp cancel_request_ref(ref, reason, state) do
%{resources: resources, requests: requests, monitors: monitors} = state

defp cancel_request_ref(ref, reason, %{requests: requests} = state) do
case requests do
# Exited or timed out before we could serve it
%{^ref => {_, mon_ref, :command, _, _}} ->
Process.demonitor(mon_ref, [:flush])
monitors = Map.delete(monitors, mon_ref)
requests = Map.delete(requests, ref)
{:noreply, %{state | requests: requests, monitors: monitors}}
%{^ref => {_, mon_ref, :command, _command, _deadline}} ->
{:noreply, remove_request(state, ref, mon_ref)}

# Exited or errored during client processing
%{^ref => {_, mon_ref, :state, worker_server_state}} ->
Process.demonitor(mon_ref, [:flush])
monitors = Map.delete(monitors, mon_ref)
requests = Map.delete(requests, ref)
state = remove(reason, worker_server_state, state)
state = %{state | requests: requests, monitors: monitors, resources: resources}
{:noreply, state}
state = remove_request(state, ref, mon_ref)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call on remove_request/3!

{:noreply, remove_worker(reason, worker_server_state, state)}

%{} ->
exit(:unexpected_remove)
Expand All @@ -471,7 +500,7 @@ defmodule NimblePool do
{:queue.in(worker_server_state, resources), state}

{:remove, reason} ->
{resources, remove(reason, worker_server_state, state)}
{resources, remove_worker(reason, worker_server_state, state)}
end
end)

Expand Down Expand Up @@ -507,30 +536,33 @@ defmodule NimblePool do
%{resources: resources, requests: requests, worker: worker, queue: queue} = state

if past_deadline?(deadline) do
requests = Map.delete(state.requests, ref)
monitors = Map.delete(state.monitors, mon_ref)
Process.demonitor(mon_ref, [:flush])
maybe_checkout(%{state | requests: requests, monitors: monitors})
state = remove_request(state, ref, mon_ref)
maybe_checkout(state)
else
case :queue.out(resources) do
{{:value, worker_server_state}, resources} ->
args = [command, from, worker_server_state]
args = [command, from, worker_server_state, state]

case apply_worker_callback(worker, :handle_checkout, args) do
{:ok, worker_client_state, worker_server_state} ->
case apply_worker_callback_with_state(state, :handle_checkout, args) do
{:ok, worker_client_state, worker_server_state, state} ->
GenServer.reply({pid, ref}, worker_client_state)

requests = Map.put(requests, ref, {pid, mon_ref, :state, worker_server_state})
%{state | resources: resources, requests: requests}

{:remove, reason} ->
state = remove(reason, worker_server_state, state)
{:remove, reason, state} ->
state = remove_worker(reason, worker_server_state, state)
maybe_checkout(command, mon_ref, deadline, from, %{state | resources: resources})

{:skip, exception, state} ->
GenServer.reply({pid, ref}, {:skipped, exception})
remove_request(state, ref, mon_ref)

other ->
raise """
unexpected return from #{inspect(worker)}.handle_checkout/3.
unexpected return from #{inspect(worker)}.handle_checkout/4.

Expected: {:ok, client_state, server_state} | {:remove, reason}
Expected: {:ok, client_state, server_state, pool_state} | {:remove, reason, pool_state} | {:skip, Exception.t(), pool_state}
Got: #{inspect(other)}
"""
end
Expand All @@ -545,9 +577,9 @@ defmodule NimblePool do
System.monotonic_time() >= deadline
end

defp past_deadline?(_), do: false
defp past_deadline?(:infinity), do: false

defp remove(reason, worker_server_state, state) do
defp remove_worker(reason, worker_server_state, state) do
state = maybe_terminate_worker(reason, worker_server_state, state)
schedule_init()
state
Expand Down Expand Up @@ -614,6 +646,17 @@ defmodule NimblePool do
end

defp apply_worker_callback(worker, fun, args) do
do_apply_worker_callback(worker, fun, args)
end

defp apply_worker_callback_with_state(%{worker: worker} = pool_state, fun, args) do
case do_apply_worker_callback(worker, fun, args) do
{:remove, reason} -> {:remove, reason, pool_state}
result -> result
end
end

defp do_apply_worker_callback(worker, fun, args) do
try do
apply(worker, fun, args)
catch
Expand All @@ -634,4 +677,19 @@ defmodule NimblePool do

defp crash_reason(:throw, value), do: {:nocatch, value}
defp crash_reason(_, value), do: value

defp remove_request(pool_state, ref, mon_ref) do
requests = Map.delete(pool_state.requests, ref)
monitors = Map.delete(pool_state.monitors, mon_ref)
Process.demonitor(mon_ref, [:flush])
%{pool_state | requests: requests, monitors: monitors}
end

defp handle_enqueue(command, %{worker: worker} = pool_state) do
if function_exported?(worker, :handle_enqueue, 2) do
apply_worker_callback(worker, :handle_enqueue, [command, pool_state])
else
{:ok, command, pool_state}
end
end
end
Loading