diff --git a/config/test.exs b/config/test.exs index 2096eb8..9e8669a 100644 --- a/config/test.exs +++ b/config/test.exs @@ -1,9 +1,9 @@ use Mix.Config config :redix_cluster, - cluster_nodes: [%{host: '10.1.2.7', port: 7000}, - %{host: '10.1.2.6', port: 7000}, - %{host: '10.1.2.5', port: 7000} + cluster_nodes: [%{host: "127.0.0.1", port: 7000}, + %{host: "127.0.0.1", port: 7001}, + %{host: "127.0.0.1", port: 7002} ], pool_size: 5, pool_max_overflow: 0, diff --git a/lib/redix_cluster.ex b/lib/redix_cluster.ex index 198ee0c..6c3c932 100644 --- a/lib/redix_cluster.ex +++ b/lib/redix_cluster.ex @@ -154,6 +154,8 @@ defmodule RedixCluster do {:ok, [Redix.Protocol.redis_value]} | {:error, term} def transaction(commands, opts\\ []), do: transaction(commands, opts, 0) + def flushdb(), do: RedixCluster.Run.flushdb() + @doc """ `Make sure` CROSSSLOT Keys in request hash to the same slot diff --git a/lib/redix_cluster/exceptions.ex b/lib/redix_cluster/exceptions.ex index 51359a7..8f08399 100644 --- a/lib/redix_cluster/exceptions.ex +++ b/lib/redix_cluster/exceptions.ex @@ -14,8 +14,8 @@ defmodule RedixCluster.Error do def exception(:no_support_transaction), do: %__MODULE__{message: "cluster pipeline don't support MULTI, using transation"} - def exception(other)when is_atom(other), do: %Redix.ConnectionError{message: :inet.format_error(other)} + def exception(other)when is_atom(other), do: %Redix.ConnectionError{reason: :inet.format_error(other)} - @type t :: %__MODULE__{message: binary} | %Redix.ConnectionError{message: binary} + @type t :: %__MODULE__{message: binary} | %Redix.ConnectionError{reason: binary} end diff --git a/lib/redix_cluster/hash.ex b/lib/redix_cluster/hash.ex index 19188f3..3b8cd34 100644 --- a/lib/redix_cluster/hash.ex +++ b/lib/redix_cluster/hash.ex @@ -1,55 +1,15 @@ defmodule RedixCluster.Hash do @moduledoc false - use Bitwise - @redis_cluster_hash_slots 16384 - @crcdef <<0x00,0x00,0x10,0x21,0x20,0x42,0x30,0x63,0x40,0x84,0x50,0xa5,0x60,0xc6,0x70,0xe7, - 0x81,0x08,0x91,0x29,0xa1,0x4a,0xb1,0x6b,0xc1,0x8c,0xd1,0xad,0xe1,0xce,0xf1,0xef, - 0x12,0x31,0x02,0x10,0x32,0x73,0x22,0x52,0x52,0xb5,0x42,0x94,0x72,0xf7,0x62,0xd6, - 0x93,0x39,0x83,0x18,0xb3,0x7b,0xa3,0x5a,0xd3,0xbd,0xc3,0x9c,0xf3,0xff,0xe3,0xde, - 0x24,0x62,0x34,0x43,0x04,0x20,0x14,0x01,0x64,0xe6,0x74,0xc7,0x44,0xa4,0x54,0x85, - 0xa5,0x6a,0xb5,0x4b,0x85,0x28,0x95,0x09,0xe5,0xee,0xf5,0xcf,0xc5,0xac,0xd5,0x8d, - 0x36,0x53,0x26,0x72,0x16,0x11,0x06,0x30,0x76,0xd7,0x66,0xf6,0x56,0x95,0x46,0xb4, - 0xb7,0x5b,0xa7,0x7a,0x97,0x19,0x87,0x38,0xf7,0xdf,0xe7,0xfe,0xd7,0x9d,0xc7,0xbc, - 0x48,0xc4,0x58,0xe5,0x68,0x86,0x78,0xa7,0x08,0x40,0x18,0x61,0x28,0x02,0x38,0x23, - 0xc9,0xcc,0xd9,0xed,0xe9,0x8e,0xf9,0xaf,0x89,0x48,0x99,0x69,0xa9,0x0a,0xb9,0x2b, - 0x5a,0xf5,0x4a,0xd4,0x7a,0xb7,0x6a,0x96,0x1a,0x71,0x0a,0x50,0x3a,0x33,0x2a,0x12, - 0xdb,0xfd,0xcb,0xdc,0xfb,0xbf,0xeb,0x9e,0x9b,0x79,0x8b,0x58,0xbb,0x3b,0xab,0x1a, - 0x6c,0xa6,0x7c,0x87,0x4c,0xe4,0x5c,0xc5,0x2c,0x22,0x3c,0x03,0x0c,0x60,0x1c,0x41, - 0xed,0xae,0xfd,0x8f,0xcd,0xec,0xdd,0xcd,0xad,0x2a,0xbd,0x0b,0x8d,0x68,0x9d,0x49, - 0x7e,0x97,0x6e,0xb6,0x5e,0xd5,0x4e,0xf4,0x3e,0x13,0x2e,0x32,0x1e,0x51,0x0e,0x70, - 0xff,0x9f,0xef,0xbe,0xdf,0xdd,0xcf,0xfc,0xbf,0x1b,0xaf,0x3a,0x9f,0x59,0x8f,0x78, - 0x91,0x88,0x81,0xa9,0xb1,0xca,0xa1,0xeb,0xd1,0x0c,0xc1,0x2d,0xf1,0x4e,0xe1,0x6f, - 0x10,0x80,0x00,0xa1,0x30,0xc2,0x20,0xe3,0x50,0x04,0x40,0x25,0x70,0x46,0x60,0x67, - 0x83,0xb9,0x93,0x98,0xa3,0xfb,0xb3,0xda,0xc3,0x3d,0xd3,0x1c,0xe3,0x7f,0xf3,0x5e, - 0x02,0xb1,0x12,0x90,0x22,0xf3,0x32,0xd2,0x42,0x35,0x52,0x14,0x62,0x77,0x72,0x56, - 0xb5,0xea,0xa5,0xcb,0x95,0xa8,0x85,0x89,0xf5,0x6e,0xe5,0x4f,0xd5,0x2c,0xc5,0x0d, - 0x34,0xe2,0x24,0xc3,0x14,0xa0,0x04,0x81,0x74,0x66,0x64,0x47,0x54,0x24,0x44,0x05, - 0xa7,0xdb,0xb7,0xfa,0x87,0x99,0x97,0xb8,0xe7,0x5f,0xf7,0x7e,0xc7,0x1d,0xd7,0x3c, - 0x26,0xd3,0x36,0xf2,0x06,0x91,0x16,0xb0,0x66,0x57,0x76,0x76,0x46,0x15,0x56,0x34, - 0xd9,0x4c,0xc9,0x6d,0xf9,0x0e,0xe9,0x2f,0x99,0xc8,0x89,0xe9,0xb9,0x8a,0xa9,0xab, - 0x58,0x44,0x48,0x65,0x78,0x06,0x68,0x27,0x18,0xc0,0x08,0xe1,0x38,0x82,0x28,0xa3, - 0xcb,0x7d,0xdb,0x5c,0xeb,0x3f,0xfb,0x1e,0x8b,0xf9,0x9b,0xd8,0xab,0xbb,0xbb,0x9a, - 0x4a,0x75,0x5a,0x54,0x6a,0x37,0x7a,0x16,0x0a,0xf1,0x1a,0xd0,0x2a,0xb3,0x3a,0x92, - 0xfd,0x2e,0xed,0x0f,0xdd,0x6c,0xcd,0x4d,0xbd,0xaa,0xad,0x8b,0x9d,0xe8,0x8d,0xc9, - 0x7c,0x26,0x6c,0x07,0x5c,0x64,0x4c,0x45,0x3c,0xa2,0x2c,0x83,0x1c,0xe0,0x0c,0xc1, - 0xef,0x1f,0xff,0x3e,0xcf,0x5d,0xdf,0x7c,0xaf,0x9b,0xbf,0xba,0x8f,0xd9,0x9f,0xf8, - 0x6e,0x17,0x7e,0x36,0x4e,0x55,0x5e,0x74,0x2e,0x93,0x3e,0xb2,0x0e,0xd1,0x1e,0xf0>> - - @spec hash(binary) :: integer - def hash(key) when is_binary(key), do: key |> to_char_list |> hash - def hash(key), do: crc16(0, key) |>rem @redis_cluster_hash_slots - defp crc16(crc, []), do: crc - defp crc16(crc, [b | rest]) do - index = bsr(crc, 8)|> bxor(b) |> band(0xff) - bsl(crc, 8)|> band(0xffff) |> bxor(crc_index(index)) |> crc16(rest) - end + ## CRCBench + # benchmark name iterations average time + # CRC 100 20819.58 µs/op + # RedixClusterCRC 10 126367.30 µs/op - defp crc_index(n) do - <> = :binary.part(@crcdef, n*2, 2) - crc - end + @spec hash(binary) :: integer + def hash(key) when is_list(key), do: key |> to_string |> hash + def hash(key), do: CRC.ccitt_16_xmodem(key) |>rem(@redis_cluster_hash_slots) end diff --git a/lib/redix_cluster/monitor.ex b/lib/redix_cluster/monitor.ex index b9806d6..e44c02a 100644 --- a/lib/redix_cluster/monitor.ex +++ b/lib/redix_cluster/monitor.ex @@ -13,30 +13,30 @@ defmodule RedixCluster.Monitor do def connect(cluster_nodes), do: GenServer.call(__MODULE__, {:connect, cluster_nodes}) @spec refresh_mapping(integer) :: :ok | {:ignore, String.t} - def refresh_mapping(version), do: GenServer.call(__MODULE__, {:reload_slots_map, version}) + def refresh_mapping(version) do + result = GenServer.call(__MODULE__, {:reload_slots_map, version}) + RedixCluster.SlotCache.refresh_mapping(version) + result + end - @spec get_slot_cache() :: {:cluster, [binary], [integer], integer} | {:not_cluster, integer, atom} def get_slot_cache() do - [{:cluster_state, state}] = :ets.lookup(__MODULE__, :cluster_state) - case state.is_cluster do - true -> {:cluster, state.slots_maps, state.slots, state.version} - false -> - [slots_map] = state.slots_maps - {:not_cluster, state.version, slots_map.node.pool} - end + GenServer.call(__MODULE__, {:get_slot}) end @spec start_link(Keyword.t) :: GenServer.on_start def start_link(options), do: GenServer.start_link(__MODULE__, nil, options) def init(nil) do - :ets.new(__MODULE__, [:protected, :set, :named_table, {:read_concurrency, true}]) case get_env(:cluster_nodes, []) do [] -> {:ok, %State{}} cluster_nodes -> {:ok, do_connect(cluster_nodes)} end end + def handle_call({:get_slot}, _from, state) do + {:reply, state, state} + end + def handle_call({:reload_slots_map, version}, _from, state = %State{version: version}) do {:reply, :ok, reload_slots_map(state)} end @@ -56,13 +56,21 @@ defmodule RedixCluster.Monitor do end defp reload_slots_map(state) do - for slots_map <- state.slots_maps, do: close_connection(slots_map) + old_slots_maps = state.slots_maps {is_cluster, cluster_info} = get_cluster_info(state.cluster_nodes) slots_maps = cluster_info |> parse_slots_maps |> connect_all_slots slots = create_slots_cache(slots_maps) - new_state = %State{state | slots: slots, slots_maps: slots_maps, version: state.version + 1, is_cluster: is_cluster} - true = :ets.insert(__MODULE__, [{:cluster_state, new_state}]) - new_state + + # close only removed pool + removed = removed_nodes(old_slots_maps, slots_maps) + for slots_map <- removed, do: close_connection(slots_map) + + %State{state | slots: slots, slots_maps: slots_maps, version: state.version + 1, is_cluster: is_cluster} + end + + defp removed_nodes(old_slots_maps, slots_maps) do + new_pools = slots_maps |> Enum.map(fn(slot) -> slot.node.pool end) + old_slots_maps |> Enum.reject(fn(slot) -> slot.node.pool in new_pools end) end defp close_connection(slots_map) do @@ -77,13 +85,15 @@ defmodule RedixCluster.Monitor do defp get_cluster_info([node|restnodes]) do case start_link_redix(node.host, node.port) do {:ok, conn} -> - case Redix.command(conn, ~w(CLUSTER SLOTS), []) do - {:ok, cluster_info} -> - Redix.stop(conn) - {true, cluster_info} - {:error, %Redix.Error{message: "ERR unknown command 'CLUSTER'"}} -> - cluster_info_from_single_node(node) - {:error, %Redix.Error{message: "ERR This instance has cluster support disabled"}} -> + try do + case Redix.command(conn, ~w(CLUSTER SLOTS), []) do + {:ok, cluster_info} -> + Redix.stop(conn) + {true, cluster_info} + {:error, _} -> get_cluster_info(restnodes) + end + rescue + Redix.Error -> cluster_info_from_single_node(node) end _ -> get_cluster_info(restnodes) @@ -93,7 +103,7 @@ defmodule RedixCluster.Monitor do #[[10923, 16383, ["Host1", 7000], ["SlaveHost1", 7001]], #[5461, 10922, ["Host2", 7000], ["SlaveHost2", 7001]], #[0, 5460, ["Host3", 7000], ["SlaveHost3", 7001]]] - defp parse_slots_maps(cluster_info) do + def parse_slots_maps(cluster_info) do cluster_info |> Stream.with_index |> Stream.map(&parse_cluster_slot/1) @@ -128,7 +138,7 @@ defmodule RedixCluster.Monitor do {false, [[0, @redis_cluster_hash_slots - 1, - [List.to_string(node.host), node.port] + [node.host, node.port] ]] } end @@ -156,11 +166,10 @@ defmodule RedixCluster.Monitor do |> String.to_atom end - defp parse_master_node([[master_host, master_port]|_]) do - %{host: to_char_list(master_host), + defp parse_master_node([[master_host, master_port|_]|_]) do + %{host: master_host, port: master_port, pool: nil } end - end diff --git a/lib/redix_cluster/run.ex b/lib/redix_cluster/run.ex index aa0442a..ee8adb7 100644 --- a/lib/redix_cluster/run.ex +++ b/lib/redix_cluster/run.ex @@ -5,47 +5,44 @@ defmodule RedixCluster.Run do @spec command(command, Keyword.t) :: {:ok, term} |{:error, term} def command(command, opts) do - case RedixCluster.Monitor.get_slot_cache do - {:cluster, slots_maps, slots, version} -> - command - |> parse_key_from_command - |> key_to_slot_hash - |> get_pool_by_slot(slots_maps, slots, version) - |> query_redis_pool(command, :command, opts) - {:not_cluster, version, pool_name} -> - query_redis_pool({version, pool_name}, command, :command, opts) - end + command + |> parse_key_from_command() + |> key_to_slot_hash() + |> RedixCluster.SlotCache.get_pool() + |> query_redis_pool(command, :command, opts) end @spec pipeline([command], Keyword.t) :: {:ok, term} |{:error, term} def pipeline(pipeline, opts) do - case RedixCluster.Monitor.get_slot_cache do - {:cluster, slots_maps, slots, version} -> - pipeline - |> parse_keys_from_pipeline - |> keys_to_slot_hashs - |> is_same_slot_hashs - |> get_pool_by_slot(slots_maps, slots, version) - |> query_redis_pool(pipeline, :pipeline, opts) - {:not_cluster, version, pool_name} -> - query_redis_pool({version, pool_name}, pipeline, :pipeline, opts) - end + pipeline + |> parse_keys_from_pipeline() + |> keys_to_slot_hashs() + |> is_same_slot_hashs() + |> RedixCluster.SlotCache.get_pool() + |> query_redis_pool(pipeline, :pipeline, opts) end @spec transaction([command], Keyword.t) :: {:ok, term} |{:error, term} def transaction(pipeline, opts) do transaction = [["MULTI"]] ++ pipeline ++ [["EXEC"]] - case RedixCluster.Monitor.get_slot_cache do - {:cluster, slots_maps, slots, version} -> - pipeline - |> parse_keys_from_pipeline - |> keys_to_slot_hashs - |> is_same_slot_hashs - |> get_pool_by_slot(slots_maps, slots, version) - |> query_redis_pool(transaction, :pipeline, opts) - {:not_cluster, version, pool_name} -> - query_redis_pool({version, pool_name}, transaction, :pipeline, opts) - end + + pipeline + |> parse_keys_from_pipeline() + |> keys_to_slot_hashs() + |> is_same_slot_hashs() + |> RedixCluster.SlotCache.get_pool() + |> query_redis_pool(transaction, :pipeline, opts) + end + + def flushdb() do + {version, slots_maps} = RedixCluster.SlotCache.get_slot_maps + Enum.each(slots_maps, fn(cluster) -> + case cluster == nil or cluster.node == nil do + true -> nil + false -> query_redis_pool({version, cluster.node.pool}, ~w(flushdb), :command, []) + end + end) + {:ok, "OK"} end defp parse_key_from_command([term1, term2|_]), do: verify_command_key(term1, term2) @@ -103,7 +100,7 @@ defmodule RedixCluster.Run do try do pool_name |> :poolboy.transaction(fn(worker) -> GenServer.call(worker, {type, command, opts}) end) - |> parse_trans_result(version) + |> parse_trans_result({version, pool_name}, command, type, opts) catch :exit, _ -> RedixCluster.Monitor.refresh_mapping(version) @@ -111,15 +108,34 @@ defmodule RedixCluster.Run do end end - defp parse_trans_result({:error, %Redix.Error{message: <<"MOVED", _redirectioninfo::binary>>}}, version) do + defp parse_trans_result({:error, %Redix.Error{message: <<"ASK", redirectioninfo::binary>>}}, {version, _pool_name}, command, type, opts) do + [_, _slot, host_info] = Regex.split(~r/\s+/, redirectioninfo) + [host, port] = Regex.split(~r/:/, host_info) + RedixCluster.Pools.Supervisor.new_pool(host, port) + pool_name = ["Pool", host, ":", port] |> Enum.join |> String.to_atom + query_redis_pool({version, pool_name}, command, type, opts) + end + defp parse_trans_result({:error, %Redix.Error{message: <<"MOVED", _redirectioninfo::binary>>}}, {version, _pool_name}, _command, _type, _opts) do + RedixCluster.Monitor.refresh_mapping(version) + {:error, :retry} + end + defp parse_trans_result({:error, :no_connection}, {version, _pool_name}, _command, _type, _opts) do + RedixCluster.Monitor.refresh_mapping(version) + {:error, :retry} + end + defp parse_trans_result({:error, :closed}, {version, _pool_name}, _command, _type, _opts) do + RedixCluster.Monitor.refresh_mapping(version) + {:error, :retry} + end + defp parse_trans_result({:error, %Redix.ConnectionError{}}, {version, _pool_name}, _command, _type, _opts) do RedixCluster.Monitor.refresh_mapping(version) {:error, :retry} end - defp parse_trans_result({:error, :no_connection}, version) do + defp parse_trans_result({:error, %Redix.Error{message: <<"CLUSTERDOWN", _::binary>>}}, {version, _pool_name}, _command, _type, _opts) do RedixCluster.Monitor.refresh_mapping(version) {:error, :retry} end - defp parse_trans_result(payload, _), do: payload + defp parse_trans_result(payload, _, _, _, _), do: payload defp verify_command_key(term1, term2) do term1 diff --git a/lib/redix_cluster/slot_cache.ex b/lib/redix_cluster/slot_cache.ex new file mode 100644 index 0000000..e65ff73 --- /dev/null +++ b/lib/redix_cluster/slot_cache.ex @@ -0,0 +1,76 @@ +defmodule RedixCluster.SlotCache do + @moduledoc false + + use GenServer + use RedixCluster.Helper + + def process_num do + get_env(:worker_num, 2) * 2 + end + + def process_name(num) do + Module.concat(__MODULE__, Integer.to_string(num)) + end + + def refresh_mapping(version) do + Enum.each(1..process_num(), fn(x) -> + target_process = process_name(x) + GenServer.call(target_process, {:reload_slots_map, version}) + end) + end + + def get_pool({:error, _} = error), do: error + def get_pool(slot) do + target_process = process_name(Enum.random(1..process_num())) + GenServer.call(target_process, {:get_pool, slot}) + end + + ### for spec + def get_slot_maps() do + target_process = process_name(Enum.random(1..process_num())) + GenServer.call(target_process, {:get_slot_maps}) + end + + def start_link(name) do + GenServer.start_link(__MODULE__, nil, [name: name]) + end + + def init(nil) do + {:ok, RedixCluster.Monitor.get_slot_cache} + end + + def handle_call({:get_pool, slot}, _from, state) do + case state.is_cluster do + true -> + {:reply, {state.version, get_pool_by_slot(slot, state.slots_maps, state.slots)}, state} + false -> + [slots_map] = state.slots_maps + {:reply, {state.version, slots_map.node.pool}, state} + end + end + + def handle_call({:reload_slots_map, version}, _from, _state = %RedixCluster.Monitor.State{version: version}) do + {:reply, :ok, RedixCluster.Monitor.get_slot_cache} + end + def handle_call({:reload_slots_map, version}, _from, state = %RedixCluster.Monitor.State{version: old_version}) do + {:reply, {:ingore, "wrong version#{version}!=#{old_version}"}, state} + end + + def handle_call({:get_slot_maps}, _from, state) do + {:reply, {state.version, state.slots_maps}, state} + end + + def handle_call(request, _from, state), do: {:reply, {:ignored, request}, state} + def handle_cast(_msg, state), do: {:noreply, state} + def handle_info(_info, state), do: {:noreply, state} + + defp get_pool_by_slot(slot, slots_maps, slots) do + index = Enum.at(slots, slot) + cluster = Enum.at(slots_maps, index - 1) + case cluster == nil or cluster.node == nil do + true -> nil + false -> cluster.node.pool + end + end + +end diff --git a/lib/redix_cluster/supervisor.ex b/lib/redix_cluster/supervisor.ex index c81435c..e2fae9d 100644 --- a/lib/redix_cluster/supervisor.ex +++ b/lib/redix_cluster/supervisor.ex @@ -5,11 +5,18 @@ defmodule RedixCluster.Supervisor do @spec start_link() :: Supervisor.on_start def start_link() do - children = [ - supervisor(RedixCluster.Pools.Supervisor, [[name: RedixCluster.Pools.Supervisor]], [modules: :dynamic]), - worker(RedixCluster.Monitor, [[name: RedixCluster.Monitor]], [modules: :dynamic]) - ] - Supervisor.start_link(children, strategy: :one_for_one, name: __MODULE__) + children = [ + supervisor(RedixCluster.Pools.Supervisor, [[name: RedixCluster.Pools.Supervisor]], [modules: :dynamic]), + worker(RedixCluster.Monitor, [[name: RedixCluster.Monitor]], [modules: :dynamic]) + ] + + slot_cache_num = RedixCluster.SlotCache.process_num() + slot_cache_worker = Enum.map(1..slot_cache_num, fn(x) -> + name = RedixCluster.SlotCache.process_name(x) + worker(RedixCluster.SlotCache, [name], [id: name]) + end) + + Supervisor.start_link(children ++ slot_cache_worker, strategy: :one_for_one, name: __MODULE__) end end diff --git a/lib/redix_cluster/worker.ex b/lib/redix_cluster/worker.ex index bff05e8..18fd3cf 100644 --- a/lib/redix_cluster/worker.ex +++ b/lib/redix_cluster/worker.ex @@ -26,10 +26,20 @@ defmodule RedixCluster.Worker do {:reply, {:error, :no_connection}, state} end def handle_call({:command, params, opts}, _From, %{conn: conn} = state) do - {:reply, Redix.command(conn, params, opts), state} + try do + {:reply, Redix.command(conn, params, opts), state} + rescue + e in Redix.Error -> + {:reply, {:error, e}, state} + end end def handle_call({:pipeline, params, opts}, _from, %{conn: conn} = state) do - {:reply, Redix.pipeline(conn, params, opts), state} + try do + {:reply, Redix.pipeline(conn, params, opts), state} + rescue + e in Redix.Error -> + {:reply, {:error, e}, state} + end end def handle_call(_request, _from, state), do: {:reply, :ok, state} diff --git a/mix.exs b/mix.exs index 85f9648..5ef249b 100644 --- a/mix.exs +++ b/mix.exs @@ -8,7 +8,7 @@ defmodule RedixCluster.Mixfile do build_embedded: Mix.env in [:prod], start_permanent: Mix.env == :prod, preferred_cli_env: [espec: :test], - deps: deps] + deps: deps()] end # Configuration for the OTP application @@ -16,6 +16,7 @@ defmodule RedixCluster.Mixfile do # Type "mix help compile.app" for more information def application do [mod: {RedixCluster, []}, + included_applications: [:crc], applications: [:logger, :redix]] end @@ -29,10 +30,11 @@ defmodule RedixCluster.Mixfile do # # Type "mix help deps" for more examples and options defp deps do - [ {:redix, "~> 0.3.1"}, + [ {:redix, "~> 0.6.0"}, {:poolboy, "~> 1.5", override: true}, {:dialyze, "~> 0.2", only: :dev}, {:dogma, "~> 0.0", only: :dev}, + {:crc, "~> 0.5"}, {:benchfella, github: "alco/benchfella", only: :bench}, {:eredis_cluster, github: "adrienmo/eredis_cluster", only: :bench}, {:espec, github: "antonmi/espec", only: :test}, diff --git a/mix.lock b/mix.lock index c6b3487..4406929 100644 --- a/mix.lock +++ b/mix.lock @@ -1,12 +1,12 @@ -%{"benchfella": {:git, "https://github.com/alco/benchfella.git", "ddf53d16472b2dfec55066e366a70035181533cb", []}, - "connection": {:hex, :connection, "1.0.2"}, - "dialyze": {:hex, :dialyze, "0.2.0"}, - "dogma": {:hex, :dogma, "0.0.11"}, - "eredis": {:git, "https://github.com/wooga/eredis.git", "cbc013f516e464706493c01662e5e9dd82d1db01", [tag: "v1.0.8"]}, - "eredis_cluster": {:git, "https://github.com/adrienmo/eredis_cluster.git", "59d82059d14add2c24b2243084ab89165d3b1953", []}, - "espec": {:git, "https://github.com/antonmi/espec.git", "3b2dd8acec2b78c19e51f0a892f3e0b68614c68f", []}, - "ex_doc": {:hex, :ex_doc, "0.11.1"}, - "meck": {:hex, :meck, "0.8.3"}, - "poison": {:hex, :poison, "1.5.0"}, - "poolboy": {:hex, :poolboy, "1.5.1"}, - "redix": {:hex, :redix, "0.3.1"}} +%{"benchfella": {:git, "https://github.com/alco/benchfella.git", "abed12d37022012133da51b4acbbf07bd61dab32", []}, + "connection": {:hex, :connection, "1.0.4", "a1cae72211f0eef17705aaededacac3eb30e6625b04a6117c1b2db6ace7d5976", [], [], "hexpm"}, + "crc": {:hex, :crc, "0.5.2", "6db0c06f4bb2ae6a737a32b31fd40842774d4aae903b76e5f4dae44bd4b2742c", [:make, :mix], [], "hexpm"}, + "dialyze": {:hex, :dialyze, "0.2.1", "9fb71767f96649020d769db7cbd7290059daff23707d6e851e206b1fdfa92f9d", [:mix], [], "hexpm"}, + "dogma": {:hex, :dogma, "0.1.13", "7b6c6ad2b3ee6501eda3bd39e197dd5198be8d520d1c175c7f713803683cf27a", [:mix], [{:poison, ">= 2.0.0", [hex: :poison, repo: "hexpm", optional: false]}], "hexpm"}, + "eredis": {:hex, :eredis, "1.0.8", "ab4fda1c4ba7fbe6c19c26c249dc13da916d762502c4b4fa2df401a8d51c5364", [:rebar], [], "hexpm"}, + "eredis_cluster": {:git, "https://github.com/adrienmo/eredis_cluster.git", "b838f5f576b1dd0acf18a1e1189dc1b9f4f27224", []}, + "espec": {:git, "https://github.com/antonmi/espec.git", "a73b0067fac8b197533b514ac38c98f0795a024a", []}, + "meck": {:hex, :meck, "0.8.4", "59ca1cd971372aa223138efcf9b29475bde299e1953046a0c727184790ab1520", [:make, :rebar], [], "hexpm"}, + "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"}, + "poolboy": {:hex, :poolboy, "1.5.1", "6b46163901cfd0a1b43d692657ed9d7e599853b3b21b95ae5ae0a777cf9b6ca8", [], [], "hexpm"}, + "redix": {:hex, :redix, "0.6.0", "b0ee9b66cd15b5fe72deeaba285e90b65dbf069b7be67f610d32d4304226b1b2", [:mix], [{:connection, "~> 1.0", [hex: :connection, repo: "hexpm", optional: false]}], "hexpm"}} diff --git a/spec/command_spec.exs b/spec/command_spec.exs index 3e03aa6..f0cec73 100644 --- a/spec/command_spec.exs +++ b/spec/command_spec.exs @@ -2,7 +2,7 @@ defmodule RedixCluster.Command.Spec do use ESpec before do - allow Redix |> to accept :start_link, fn(_, _) -> {:ok, self} end, [:non_strict, :unstick] + allow Redix |> to accept :start_link, fn(_, _) -> {:ok, self()} end, [:non_strict, :unstick] allow Redix |> to accept :stop, fn(_) -> :ok end, [:non_strict, :unstick] allow Redix |> to accept :command, fn (_, ~w(CLUSTER SLOTS), _) -> {:ok, @@ -10,6 +10,7 @@ defmodule RedixCluster.Command.Spec do [5461, 10922, ["10.1.2.6", 7000], ["10.1.2.7", 7001]], [0, 5460, ["10.1.2.7", 7000], ["10.1.2.6", 7001]]]} (_, ~w(set a test), _) -> {:ok, "OK"} + (_, ~w(flushdb), _) -> {:ok, "OK"} (_, ~w(get a), _) -> {:ok, "test"} (_, ~w(incr a), _) -> {:error, %Redix.Error{message: "ERR value is not an integer or out of range"}} end, [:non_strict, :unstick] @@ -35,4 +36,7 @@ defmodule RedixCluster.Command.Spec do end end + context "flushdb test" do + it do: expect RedixCluster.flushdb() |> to eq {:ok, "OK"} + end end diff --git a/spec/hashslot_spec.exs b/spec/hashslot_spec.exs index 560ebd2..6fccaf5 100644 --- a/spec/hashslot_spec.exs +++ b/spec/hashslot_spec.exs @@ -6,13 +6,13 @@ defmodule CreateHashSlot.Spec do "q", "r", "s", "t", "u", "v", "w", "x", "y", "z"] it do - chars_hash |> to eq [15495, 3300, 7365, 11298, 15363, 3168, 7233, 11694, 15759, 3564, 7629, 11562, + chars_hash() |> to eq [15495, 3300, 7365, 11298, 15363, 3168, 7233, 11694, 15759, 3564, 7629, 11562, 15627, 3432, 7497, 16023, 11958, 7893, 3828, 15891, 11826, 7761, 3696, 16287, 12222, 8157] end it do - same_hash |> to eq List.duplicate(15495, 100) + same_hash() |> to eq List.duplicate(15495, 100) end defp chars_hash() do @@ -20,7 +20,7 @@ defmodule CreateHashSlot.Spec do end defp same_hash() do - for char <- List.duplicate("{a}", 100), do: RedixCluster.Run.key_to_slot_hash("#{char}+#{:random.uniform}") + for char <- List.duplicate("{a}", 100), do: RedixCluster.Run.key_to_slot_hash("#{char}+#{:rand.uniform}") end end diff --git a/spec/parse_slots_map.exs b/spec/parse_slots_map.exs new file mode 100644 index 0000000..d6054f9 --- /dev/null +++ b/spec/parse_slots_map.exs @@ -0,0 +1,16 @@ +defmodule ParseSlotsMap.Spec do + + use ESpec + + @cluster_info [[10923, 16383, ["Host1", 7000, "hashhash"], ["SlaveHost1", 7001, "hashhashaaa"]], + [5461, 10922, ["Host2", 7000, "hash"]], + [0, 5460, ["Host3", 7000, "haahaha"], ["SlaveHost3", 7001, "aaaa"]]] + + it do + RedixCluster.Monitor.parse_slots_maps(@cluster_info) |> to eq [ + %{start_slot: 10923, end_slot: 16383, index: 1, name: :"10923:16383", node: %{host: "Host1", pool: nil, port: 7000}}, + %{start_slot: 5461, end_slot: 10922, index: 2, name: :"5461:10922", node: %{host: "Host2", pool: nil, port: 7000}}, + %{start_slot: 0, end_slot: 5460, index: 3, name: :"0:5460", node: %{host: "Host3", pool: nil, port: 7000}}] + end + + end diff --git a/spec/pipeline_spec.exs b/spec/pipeline_spec.exs index ed97714..98160b7 100644 --- a/spec/pipeline_spec.exs +++ b/spec/pipeline_spec.exs @@ -2,7 +2,7 @@ defmodule RedixCluster.Pipeline.Spec do use ESpec before do - allow Redix |> to accept :start_link, fn(_, _) -> {:ok, self} end, [:non_strict, :unstick] + allow Redix |> to accept :start_link, fn(_, _) -> {:ok, self()} end, [:non_strict, :unstick] allow Redix |> to accept :stop, fn(_) -> :ok end, [:non_strict, :unstick] allow Redix |> to accept :command, fn (_, ~w(CLUSTER SLOTS), _) -> {:ok, diff --git a/spec/refreshcache_spec.exs b/spec/refreshcache_spec.exs index 39507a4..751d840 100644 --- a/spec/refreshcache_spec.exs +++ b/spec/refreshcache_spec.exs @@ -2,11 +2,11 @@ defmodule RedixCluster.RefreshCache.Spec do use ESpec before do - allow Redix |> to accept :start_link, fn(_, _) -> {:ok, self} end, [:non_strict, :unstick] + allow Redix |> to accept :start_link, fn(_, _) -> {:ok, self()} end, [:non_strict, :unstick] allow Redix |> to accept :stop, fn(_) -> :ok end, [:non_strict, :unstick] allow Redix |> to accept :command, fn (_, ~w(CLUSTER SLOTS), _) -> - case get_version do + case get_version() do 1 -> {:ok, [[10923, 16383, ["10.1.2.5", 7000], ["10.1.2.5", 7001]], [5461, 10922, ["10.1.2.6", 7000], ["10.1.2.7", 7001]], @@ -17,7 +17,7 @@ defmodule RedixCluster.RefreshCache.Spec do [0, 5000, ["10.1.2.7", 7000], ["10.1.2.6", 7001]]]} end (_, ~w(set a test), _) -> - case get_version do + case get_version() do 1 -> {:error, %Redix.Error{message: "MOVED 15495 10.1.2.5:7000"}} _ -> {:ok, "OK"} end @@ -31,20 +31,19 @@ defmodule RedixCluster.RefreshCache.Spec do finally do: {:shared, count: shared.count + 1} context "refresh cache test" do - it do: expect refresh_test |> to eq {{:ok, "OK"}, 1, 2} + it do: expect refresh_test() |> to eq {{:ok, "OK"}, 1, 2} end defp refresh_test do - old = get_version + old = get_version() result = RedixCluster.command(~w(set a test)) - new = get_version + new = get_version() {result, old, new} end defp get_version() do - case RedixCluster.Monitor.get_slot_cache do - {:not_cluster, version, _} -> version - {:cluster, _, _, version} -> version + case RedixCluster.SlotCache.get_slot_maps do + {version, _} -> version end end diff --git a/spec/transation_spec.exs b/spec/transation_spec.exs index e3763bf..f7a742f 100644 --- a/spec/transation_spec.exs +++ b/spec/transation_spec.exs @@ -2,7 +2,7 @@ defmodule RedixCluster.Transation.Spec do use ESpec, shared: true before do - allow Redix |> to accept :start_link, fn(_, _) -> {:ok, self} end, [:non_strict, :unstick] + allow Redix |> to accept :start_link, fn(_, _) -> {:ok, self()} end, [:non_strict, :unstick] allow Redix |> to accept :stop, fn(_) -> :ok end, [:non_strict, :unstick] allow Redix |> to accept :command, fn (_, ~w(CLUSTER SLOTS), _) -> {:ok,