Skip to content

Commit

Permalink
fix: added support for read preference specified by the URL (#207)
Browse files Browse the repository at this point in the history
  • Loading branch information
zookzook authored Sep 17, 2023
1 parent be592e3 commit 9e4d569
Show file tree
Hide file tree
Showing 8 changed files with 248 additions and 117 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -798,15 +798,15 @@ a simple map, supporting the following keys:

* `:mode`, possible values: `:primary`, `:primary_preferred`, `:secondary`, `:secondary_preferred` and `:nearest`
* `:max_staleness_ms`, the maxStaleness value in milliseconds
* `:tag_sets`, the set of tags, for example: `[dc: "west", usage: "production"]`
* `:tags`, the set of tags, for example: `[dc: "west", usage: "production"]`

The driver selects the server using the read preference.

```elixr
prefs = %{
mode: :secondary,
max_staleness_ms: 120_000,
tag_sets: [dc: "west", usage: "production"]
tags: [dc: "west", usage: "production"]
}
Mongo.find_one(top, "dogs", %{name: "Oskar"}, read_preference: prefs)
Expand Down
147 changes: 77 additions & 70 deletions lib/mongo/read_preference.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,134 +4,141 @@ defmodule Mongo.ReadPreference do
@moduledoc ~S"""
Determines which servers are considered suitable for read operations
A read preference consists of a mode and optional `tag_sets`, max_staleness_ms, and `hedge`.
A read preference consists of a mode and optional `tags`, max_staleness_ms, and `hedge`.
The mode prioritizes between primaries and secondaries to produce either a single suitable server or a list of candidate servers.
If tag_sets and maxStalenessSeconds are set, they determine which candidate servers are eligible for selection.
If tags and maxStalenessSeconds are set, they determine which candidate servers are eligible for selection.
If hedge is set, it configures how server hedged reads are used.
The default mode is `:primary`.
The default tag_sets is a list with an empty tag set: [{}].
The default tags is a list with an empty tag set: [{}].
The default max_staleness_ms is unset.
The default hedge is unset.
## mode
* `:primary` Only an available primary is suitable.
* `:secondary` All secondaries (and only secondaries) are candidates, but only eligible candidates (i.e. after applying tag_sets and maxStalenessSeconds) are suitable.
* `:secondary` All secondaries (and only secondaries) are candidates, but only eligible candidates (i.e. after applying tags and maxStalenessSeconds) are suitable.
* `:primary_preferred` If a primary is available, only the primary is suitable. Otherwise, all secondaries are candidates,
but only eligible secondaries are suitable.
* `:secondary_preferred` All secondaries are candidates. If there is at least one eligible secondary, only eligible secondaries are suitable.
Otherwise, when there are no eligible secondaries, the primary is suitable.
* `:nearest` The primary and all secondaries are candidates, but only eligible candidates are suitable.
"""
@type t :: %{
mode:
:primary
| :secondary
| :primary_preferred
| :secondary_preferred
| :nearest,
tag_sets: [%{String.t() => String.t()}],
max_staleness_ms: non_neg_integer,
hedge: BSON.document()
}

@primary %{
mode: :primary,
tag_sets: [],
tags: [],
max_staleness_ms: 0
}

def primary(map \\ nil)
@doc """
Merge default values to the read preferences and converts deprecated tag_sets to tags
"""
def merge_defaults(%{tag_sets: tags} = map) do
map =
map
|> Map.delete(:tag_sets)
|> Map.put(:tags, tags)

Map.merge(@primary, map)
end

def primary(map) when is_map(map) do
def merge_defaults(map) when is_map(map) do
Map.merge(@primary, map)
end

def primary(_), do: @primary
def merge_defaults(_other) do
@primary
end

@doc """
Add read preference to the cmd
"""
def add_read_preference(cmd, opts) do
case Keyword.get(opts, :read_preference) do
nil -> cmd
pref -> cmd ++ ["$readPreference": pref]
nil ->
cmd

pref ->
cmd ++ ["$readPreference": pref]
end
end

@doc """
From the specs:
Use of slaveOk
There are two usages of slaveOK:
* A driver query parameter that predated read preference modes and tag set lists.
* A wire protocol flag on OP_QUERY operations
Converts the preference to the mongodb format for replica sets
"""
def slave_ok(%{:mode => :primary}) do
%{:mode => :primary}
def to_replica_set(%{:mode => :primary}) do
%{mode: :primary}
end

def slave_ok(config) do
def to_replica_set(config) do
mode =
case config[:mode] do
:primary_preferred -> :primaryPreferred
:secondary_preferred -> :secondaryPreferred
other -> other
end
:primary_preferred ->
:primaryPreferred

filter_nils(mode: mode, tag_sets: config[:tag_sets])
end
:secondary_preferred ->
:secondaryPreferred

##
# Therefore, when sending queries to a mongos, the following rules apply:
#
# For mode 'primary', drivers MUST NOT set the slaveOK wire protocol flag and MUST NOT use $readPreference
def mongos(%{mode: :primary}) do
nil
end
other ->
other
end

# For mode 'secondary', drivers MUST set the slaveOK wire protocol flag and MUST also use $readPreference
def mongos(%{mode: :secondary} = config) do
transform(config)
end
case config[:tags] do
[] ->
%{mode: mode}

# For mode 'primaryPreferred', drivers MUST set the slaveOK wire protocol flag and MUST also use $readPreference
def mongos(%{mode: :primary_preferred} = config) do
transform(config)
end
nil ->
%{mode: mode}

# For mode 'secondaryPreferred', drivers MUST set the slaveOK wire protocol flag. If the read preference contains a
# non-empty tag_sets parameter, maxStalenessSeconds is a positive integer, or the hedge parameter is non-empty,
# drivers MUST use $readPreference; otherwise, drivers MUST NOT use $readPreference
def mongos(%{mode: :secondary_preferred} = config) do
transform(config)
tags ->
%{mode: mode, tags: [tags]}
end
end

# For mode 'nearest', drivers MUST set the slaveOK wire protocol flag and MUST also use $readPreference
def mongos(%{mode: :nearest} = config) do
transform(config)
@doc """
Converts the preference to the mongodb format for mongos
"""
def to_mongos(%{mode: :primary}) do
nil
end

defp transform(config) do
# for the others we should use the read preferences
def to_mongos(config) do
mode =
case config[:mode] do
:primary_preferred -> :primaryPreferred
:secondary_preferred -> :secondaryPreferred
other -> other
:primary_preferred ->
:primaryPreferred

:secondary_preferred ->
:secondaryPreferred

other ->
other
end

max_staleness_seconds =
case config[:max_staleness_ms] do
i when is_integer(i) -> div(i, 1000)
nil -> nil
i when is_integer(i) ->
div(i, 1000)

nil ->
nil
end

read_preference =
case config[:tags] do
[] ->
%{mode: mode, maxStalenessSeconds: max_staleness_seconds, hedge: config[:hedge]}

nil ->
%{mode: mode, maxStalenessSeconds: max_staleness_seconds, hedge: config[:hedge]}

tags ->
%{mode: mode, tags: [tags], maxStalenessSeconds: max_staleness_seconds, hedge: config[:hedge]}
end

[mode: mode, tag_sets: config[:tag_sets], maxStalenessSeconds: max_staleness_seconds, hedge: config[:hedge]]
|> filter_nils()
filter_nils(read_preference)
end
end
14 changes: 14 additions & 0 deletions lib/mongo/topology.ex
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,8 @@ defmodule Mongo.Topology do
# checkout a new session
#
def handle_call({:checkout_session, read_write_type, opts}, from, %{:topology => topology, :waiting_pids => waiting} = state) do
opts = merge_read_preferences(opts, state.opts)

case TopologyDescription.select_servers(topology, read_write_type, opts) do
:empty ->
Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :checkout_session, cmd_type: read_write_type, topology: topology, opts: opts})
Expand All @@ -398,6 +400,8 @@ defmodule Mongo.Topology do
end

def handle_call({:select_server, read_write_type, opts}, from, %{:topology => topology, :waiting_pids => waiting} = state) do
opts = merge_read_preferences(opts, state.opts)

case TopologyDescription.select_servers(topology, read_write_type, opts) do
:empty ->
Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :select_server, cmd_type: read_write_type, topology: topology, opts: opts})
Expand Down Expand Up @@ -579,4 +583,14 @@ defmodule Mongo.Topology do
defp fetch_arbiters(state) do
Enum.flat_map(state.topology.servers, fn {_, s} -> s.arbiters end)
end

defp merge_read_preferences(opts, url_opts) do
case Keyword.get(url_opts, :read_preference) do
nil ->
opts

read_preference ->
Keyword.put_new(opts, :read_preference, read_preference)
end
end
end
Loading

0 comments on commit 9e4d569

Please sign in to comment.