Elixir: Регистрируем процессы — практическое руководство

в 9:39, , рубрики: Elixir, erlang, Erlang/OTP, functional programming, otp, конкурентное программирование, параллельное программирование, функциональное программирование

Elixir: Регистрируем процессы — практическое руководство - 1

Процессы в Elixir (ну и в Erlang конечно же) идентифицируются с помощью уникального идентификатора процессаpid.
Мы используем их, чтобы взаимодействовать с процессами. Сообщения посылаются как бы в pid, а виртуальная машина сама заботится о доставке этих сообщений в правильный процесс.
Иногда, впрочем, чрезмерное доверие к pid может приводить к значительным проблемам.
К примеру, мы можем хранить pid уже мёртвого процесса, или мы можем использовать Supervisor, который абстрагирует создание процессов от нас, поэтому мы даже не знаем, какой у них pid (пер: а ещё Supervisor можете перезапустить упавший процесс с другим pid, и мы об этом не узнаем никак).
Давайте создадим простое приложение и посмотрим: с какими проблемами мы можем столкнуться и как мы эти проблемы будем решать.

Начинаем вообще без реестра

Для первого примера — создадим простой чат. Начнём с создания mix проекта:

$ mix new chat

Создадим абсолютно стандартный GenServer, который будем использовать на протяжении всех примеров в этом статье:

# ./lib/chat/server.ex

defmodule Chat.Server do
  use GenServer
  # API
  def start_link do
    GenServer.start_link(__MODULE__, [])
  end
  def add_message(pid, message) do
    GenServer.cast(pid, {:add_message, message})
  end
  def get_messages(pid) do
    GenServer.call(pid, :get_messages)
  end
  # SERVER
  def init(messages) do
    {:ok, messages}
  end
  def handle_cast({:add_message, new_message}, messages) do
    {:noreply, [new_message | messages]}
  end
  def handle_call(:get_messages, _from, messages) do
    {:reply, messages, messages}
  end
end

Если такой код кажется вам незнакомым или не понятным — почитайте начало работы с Elixir, в котором есть отличные параграфы об OTP.

Запустим iex сессию с mix окружением и попробуем поработать с нашим сервером:

$ iex -S mix

iex> {:ok, pid} = Chat.Server.start_link
{:ok, #PID<0.107.0>}
iex> Chat.Server.add_message(pid, "foo")
:ok
iex> Chat.Server.add_message(pid, "bar")
:ok
iex> Chat.Server.get_messages(pid)
["bar", "foo"]

Код этого этапа — вот в этом коммите

На этом этапе вроде как всё так хорошо, что просто замечательно. Мы получаем pid процесса, затем для каждого сообщения, которое мы хотим послать (add_message/2 и get_messages/1) мы передаём этот pid — и всё работает настолько предсказуемо, что даже скучно.
Впрочем, веселуха начинается тогда, когда мы попробуем добавить Supervisor...

Очень приятно: я — Supervisor!

Итак, по какой-то причине наш процесс Chat.Server умирает. Мы остаёмся одни в пустой и холодной iex сессии, и у нас нету другого выбора, кроме как запустить новый процесс, получить его pid и писать сообщения уже на этот новый pid. Так давайте же создадим Supervisor — и нам не придётся беспокоится о таких мелочах!

# ./lib/chat/supervisor.ex

defmodule Chat.Supervisor do
  use Supervisor
  def start_link do
    Supervisor.start_link(__MODULE__, [])
  end
  def init(_) do
    children = [
      worker(Chat.Server, [])
    ]
    supervise(children, strategy: :one_for_one)
  end
end

Ну, создать Supervisor очень просто. Но у нас теперь проблема, если модель поведения нашего сервера не изменится. Ведь мы не запускаем процесс Chat.Server сами, Supervisor делает это за нас. И поэтому мы не имеем никакого доступа к pid процесса!

Эта не баг, а фича такого OTP паттерна, как Supervisor. Мы не можем получить доступ к pid его дочерних процессов, потому что он может неожиданно (но, естественно, только в случае необходимости) перезапустить процесс, а фактически убить его и создать новый с новым pid.

Регистрируем имена процессов

Чтобы получить доступ к нашему процессу Chat.Server нам нужно придумать способ указывать на процесс, другой — не pid. нам нужен такой указатель, чтобы он сохранялся даже при рестарте процесса через Supervisor (пер: то есть даже тогда, когда меняется pid).
И такой указатель называется имя!

Для начала, изменим Chat.Server:

# ./lib/chat/server.ex

defmodule Chat.Server do
  use GenServer
  def start_link do
    # We now start the GenServer with a `name` option.
    GenServer.start_link(__MODULE__, [], name: :chat_room)
  end
  # And our function doesn't need to receive the pid anymore,
  # as we can reference the process with its unique name.
  def add_message(message) do
    GenServer.cast(:chat_room, {:add_message, message})
  end
  def get_messages do
    GenServer.call(:chat_room, :get_messages)
  end
  # ...
end

Изменения — вот в этом коммите

Сейчас всё должно работать так же, но только лучше — ведь мы не должны передавать везде этот pid:

$ iex -S mix

iex> Chat.Supervisor.start_link
{:ok, #PID<0.94.0>}
iex> Chat.Server.add_message("foo")
:ok
iex> Chat.Server.add_message("bar")
:ok
iex> Chat.Server.get_messages
["bar", "foo"]

Даже если процесс перезапустится — всё равно мы сможем обратиться к нему тем же способом:

iex> Process.whereis(:chat_room)
#PID<0.111.0>
iex> Process.whereis(:chat_room) |> Process.exit(:kill)
true
iex> Process.whereis(:chat_room)
#PID<0.114.0>
iex> Chat.Server.add_message "foo"
:ok
iex> Chat.Server.get_messages
["foo"]

Ну, для наших текущих задач вроде бы проблемы решены, но давайте попробуем сделать что-нибудь посложнее (и более приближённое к реальным задачам).

Динамическое создание процессов

Представьте, что нам необходимо поддерживать несколько чат-комнат. Клиент может создать новую комнату с именем, и он ожидает, что сможет посылать сообщения в ту комнату, которую он хочет. Тогда интерфейс должен быть приблизительно такой:

iex> Chat.Supervisor.start_room("first room")
iex> Chat.Supervisor.start_room("second room")
iex> Chat.Server.add_message("first room", "foo")
iex> Chat.Server.add_message("second room", "bar")
iex> Chat.Server.get_messages("first room")
["foo"]
iex> Chat.Server.get_messages("second room")
["bar"]

Начнём пожалуй сверху, и изменим Supervisor чтобы всё это поддерживать:

# ./lib/chat/supervisor.ex

defmodule Chat.Supervisor do
  use Supervisor
  def start_link do
    # We are now registering our supervisor process with a name
    # so we can reference it in the `start_room/1` function
    Supervisor.start_link(__MODULE__, [], name: :chat_supervisor)
  end
  def start_room(name) do
    # And we use `start_child/2` to start a new Chat.Server process
    Supervisor.start_child(:chat_supervisor, [name])
  end
  def init(_) do
    children = [
      worker(Chat.Server, [])
    ]
    # We also changed the `strategty` to `simple_one_for_one`.
    # With this strategy, we define just a "template" for a child,
    # no process is started during the Supervisor initialization,
    # just when we call `start_child/2`
    supervise(children, strategy: :simple_one_for_one)
  end
end

И давайте заставим наш Chat.Server принимать имена в start_link функции:

# ./lib/chat/server.ex

defmodule Chat.Server do
  use GenServer
  # Just accept a `name` parameter here for now
  def start_link(name) do
    GenServer.start_link(__MODULE__, [], name: :chat_room)
  end
  #...
end

Изменения — вот в этом коммите

А вот и проблема! У нас же может быть несколько процессов Chat.Server, и они не могут все быть с именем :chat_room. Беда...

$ iex -S mix

iex> Chat.Supervisor.start_link
{:ok, #PID<0.107.0>}
iex> Chat.Supervisor.start_room "foo"
{:ok, #PID<0.109.0>}
iex> Chat.Supervisor.start_room "bar"
{:error, {:already_started, #PID<0.109.0>}}

Честно говоря, VM очень красноречива. Мы пытаемся создать второй процесс, но процесс с таким именем уже существует, о чём нам так ехидно напоминает среда. Надо придумать какой то другой способ, но какой?..

К сожалению, тип аргумента name определён достаточно чётко. Мы не можем использовать что-то типа {:chat_room, "room name"}. Давайте обратимся к документации:

Поддерживаемые значения:
atom — в этом случае GenServer регистрируется локально с данным именем atom с помощью Process.register/2.
{:global, term} — в этом случае GenServer регистрируется глобально с данным именем term с помощью функций в модуле :global.
{:via, module, term} — в этом случае GenServer регистрируется с помощью определённого в module механизма и имени `term.

Оригинал на английском
The supported values are:
an atom — the GenServer is registered locally with the given name using Process.register/2.
{:global, term} — the GenServer is registered globally with the given term using the functions in the :global module.
{:via, module, term} — the GenServer is registered with the given mechanism and name.

Первую опцию — atom, мы уже использовали, и точно знаем, что в нашем хитром случае она не подходит.
Вторая опция используется для регистрации процесса глобально в кластере нод. Она использует локальную ETS таблицу. Кроме того она будет требовать постоянной синхронизации внутри нод в кластере, в связи с чем работа программы будет замедляться. Так что используйте её только когда это действительно нужно.
Третья, и последняя, опция использует в качестве параметра кортеж с :via, и это как раз то, что нам надо для решения нашей проблемы! Вот что говорит по этому поводу документация:

Опция :via принимает в качестве параметра модуль, который имеет следующий интерфейс: register_name/2, unregister_name/1, whereis_name/1 и send/2.

Оригинал на английском
The :via option expects a module that exports register_name/2, unregister_name/1, whereis_name/1 and send/2.

Вообще ничего не понятно? Мне тоже! Так что посмотрим этот метод в деле.

Используем кортеж :via

Итак, кортеж :via — это способ сказать Elixir, что мы собираемся использовать отдельный модуль для регистрации наших процессов. Этот модуль должен делать следующие вещи:

  • Регистрировать имя, которым может быть любой term, с помощью функции register_name/2;
  • Удалять имена из регистра, с помощью функции unregister_name/1;
  • Находить pid по имени, с помощью whereis_name/1;
  • Посылать сообщения определённому процессу с помощью send/2.

Чтобы это всё работало, вышеперечисленные функции должны передавать ответ в определённом формате, определённом в OTP — так же как и handle_call/3 и handle_cast/2 подчиняются определённым правилам.

Попробуем определить модуль, которые всё это знает:

# ./lib/chat/registry.ex

defmodule Chat.Registry do
  use GenServer
  # API
  def start_link do
    # We register our registry (yeah, I know) with a simple name,
    # just so we can reference it in the other functions.
    GenServer.start_link(__MODULE__, nil, name: :registry)
  end
  def whereis_name(room_name) do
    GenServer.call(:registry, {:whereis_name, room_name})
  end
  def register_name(room_name, pid) do
    GenServer.call(:registry, {:register_name, room_name, pid})
  end
  def unregister_name(room_name) do
    GenServer.cast(:registry, {:unregister_name, room_name})
  end
  def send(room_name, message) do
    # If we try to send a message to a process
    # that is not registered, we return a tuple in the format
    # {:badarg, {process_name, error_message}}.
    # Otherwise, we just forward the message to the pid of this
    # room.
    case whereis_name(room_name) do
      :undefined ->
        {:badarg, {room_name, message}}
      pid ->
        Kernel.send(pid, message)
        pid
    end
  end
  # SERVER
  def init(_) do
    # We will use a simple Map to store our processes in
    # the format %{"room name" => pid}
    {:ok, Map.new}
  end
  def handle_call({:whereis_name, room_name}, _from, state) do
    {:reply, Map.get(state, room_name, :undefined), state}
  end
  def handle_call({:register_name, room_name, pid}, _from, state) do
    # Registering a name is just a matter of putting it in our Map.
    # Our response tuple include a `:no` or `:yes` indicating if
    # the process was included or if it was already present.
    case Map.get(state, room_name) do
      nil ->
        {:reply, :yes, Map.put(state, room_name, pid)}
      _ ->
        {:reply, :no, state}
    end
  end
  def handle_cast({:unregister_name, room_name}, state) do
    # And unregistering is as simple as deleting an entry
    # from our Map
    {:noreply, Map.delete(state, room_name)}
  end
end

Опять же: в наших руках выбрать, каким образом наш реестр будет внутри работать. Здесь мы используем простую Map для связи имени и pid. Этот код абсолютно прост и прямолинеен, особенно если вы хорошо знаете как работает GenServer. Незнакомыми могут казаться только возвращаемые функциями значения.

Пришло время попробовать наш реестр в iex сессии:

$ iex -S mix

iex> {:ok, pid} = Chat.Server.start_link("room1")
{:ok, #PID<0.107.0>}
iex> Chat.Registry.start_link
{:ok, #PID<0.109.0>}
iex> Chat.Registry.whereis_name("room1")
:undefined
iex> Chat.Registry.register_name("room1", pid)
:yes
iex> Chat.Registry.register_name("room1", pid)
:no
iex> Chat.Registry.whereis_name("room1")
#PID<0.107.0>
iex> Chat.Registry.unregister_name("room1")
:ok
iex> Chat.Registry.whereis_name("room1")
:undefined

5 секунд — полёт отличный! Реестр работает как надо: и регистрирует, и удаляет регистрацию. Попробуем его использовать в наших чатах.

Наша проблема была в том, что у нас были нескольких запущенных серверов Chat.Server, инициализированных через Supervisor. Чтобы отправить сообщение в определённую комнату, мы хотели бы вызывать Chat.Server.add_message(“room1”, “my message”), поэтому мы должны были бы регистрировать имена серверов как {:chat_room, “room1”} и {:chat_room, “room2”}. Вот как это делается через кортеж :via:

# ./lib/chat/server.ex
defmodule Chat.Server do
  use GenServer
  # API
  def start_link(name) do
    # Instead of passing an atom to the `name` option, we send 
    # a tuple. Here we extract this tuple to a private method
    # called `via_tuple` that can be reused in every function
    GenServer.start_link(__MODULE__, [], name: via_tuple(name))
  end
  def add_message(room_name, message) do
    # And the `GenServer` callbacks will accept this tuple the 
    # same way it accepts a pid or an atom.
    GenServer.cast(via_tuple(room_name), {:add_message, message})
  end
  def get_messages(room_name) do
    GenServer.call(via_tuple(room_name), :get_messages)
  end
  defp via_tuple(room_name) do
    # And the tuple always follow the same format:
    # {:via, module_name, term}
    {:via, Chat.Registry, {:chat_room, room_name}}
  end
  # SERVER (no changes required here)
  # ...
end

Изменения — вот в этом коммите

Вот что здесь происходит: каждый раз, когда мы посылаем сообщение в Chat.Server, передавая имя комнаты, он сам будет находить pid нужного процесса с помощью того модуля, который мы ему передали в кортеже :via (в данном случае это Chat.Registry).
Это решает нашу проблему: теперь мы можем использовать любое количество Chat.Server процессов (ну, пока не закончится фантазия на имена), и нам никогда не надо знать их pid. Совсем.

Впрочем, есть ещё одна проблема в таком решении. Догадались?
Именно! Наш реестр не знает о процессах, которые упали, и должны быть перезапущены через Supervisor. А это значит, что когда такое произойдёт, реестр не даст пересоздать запись с таким же именем, и будет хранить pid мёртвого процесса.

По идее, решение этой проблемы — не слишком сложное. Мы заставим наш реестр осуществлять мониторинг всех процессов, pid которых он хранит. Как только такой "наблюдаемый" процесс упадёт — мы его просто удалим из нашего реестра.

# in lib/chat/registry.ex
defmodule Chat.Registry do
  # ...
  def handle_call({:register_name, room_name, pid}, _from, state) do
    case Map.get(state, room_name) do
      nil ->
        # When a new process is registered, we start monitoring it.
        Process.monitor(pid)
        {:reply, :yes, Map.put(state, room_name, pid)}
      _ ->
        {:reply, :no, state}
    end
  end
  def handle_info({:DOWN, _, :process, pid, _}, state) do
    # When a monitored process dies, we will receive a
    # `:DOWN` message that we can use to remove the 
    # dead pid from our registry.
    {:noreply, remove_pid(state, pid)}
  end
  def remove_pid(state, pid_to_remove) do
    # And here we just filter out the dead pid
    remove = fn {_key, pid} -> pid  != pid_to_remove end
    Enum.filter(state, remove) |> Enum.into(%{})
  end
end

Изменения — вот в этом коммите

Убедимся в том, что всё работает:

$ iex -S mix

iex> Chat.Registry.start_link
{:ok, #PID<0.107.0>}
iex> Chat.Supervisor.start_link
{:ok, #PID<0.109.0>}
iex> Chat.Supervisor.start_room("room1")
{:ok, #PID<0.111.0>}
iex> Chat.Server.add_message("room1", "message")
:ok
iex> Chat.Server.get_messages("room1")
["message"]
iex> Chat.Registry.whereis_name({:chat_room, "room1"}) |> Process.exit(:kill)
true
iex> Chat.Server.add_message("room1", "message")
:ok
iex> Chat.Server.get_messages("room1")
["message"]

Ну, теперь уже совершенно не важно, сколько раз Supervisor перезапустит процесс Chat.Server: как только мы посылаем сообщение в комнату — оно будет доставлено по верному pid.

Упрощаем с помощью gproc

В принципе с нашим чатом мы на этом и закончим, но хочется рассказать ещё об одной фиче, которая упростит для нас регистрацию с помощью кортежа :via. Это — gproc, — библиотека Erlang.
И научим наш Chat.Server использовать gproc вместо нашего Chat.Registry, а потом мы вообще избавимся от Chat.Registry.

Начнём пожалуй с зависимостей. Для этого добавим gproc в mix.exs:

# ./mix.exs

defmodule Chat.Mixfile do
  # ...
  def application do
    [applications: [:logger, :gproc]]
  end
  defp deps do
    [{:gproc, "0.3.1"}]
  end
end

Затем подтянем зависимости с помощью:

$ mix deps.get

Теперь мы можем поменять нашу регистрацию с помощью кортежа :via — пусть использует gproc, а не Chat.Registry:

# ./lib/chat/server.ex

defmodule Chat.Server do
  # ...
  # The only thing we need to change is the `via_tuple/1` function,
  # to make it use `gproc` instead of `Chat.Registry`
  defp via_tuple(room_name) do
    {:via, :gproc, {:n, :l, {:chat_room, room_name}}}
  end
  # ...
end

gproc использует ключи-кортежи, состоящие из трёх значений: {type, scope, key}.

В нашем случае мы используем:

  • :n — это значит имя, то есть не может быть больше одного процесса, зарегистрированного под таким ключом;
  • :l — это значит local, то есть процесс регистрируется только на нашей ноде;
  • {:chat_room, room_name} — это сам ключ в виде кортежа.

Дополнительную информацию по возможным настройкам gproc искать тут.

После таких изменений вообще выкинем наш Chat.Registry, и проверим что всё продолжает работать в iex сессии:

$ iex -S mix

iex> Chat.Supervisor.start_link
{:ok, #PID<0.190.0>}
iex> Chat.Supervisor.start_room("room1")
{:ok, #PID<0.192.0>}
iex> Chat.Supervisor.start_room("room2")
{:ok, #PID<0.194.0>}
iex> Chat.Server.add_message("room1", "first message")
:ok
iex> Chat.Server.add_message("room2", "second message")
:ok
iex> Chat.Server.get_messages("room1")
["first message"]
iex> Chat.Server.get_messages("room2")
["second message"]
iex> :gproc.where({:n, :l, {:chat_room, "room1"}}) |> Process.exit(:kill)
true
iex> Chat.Server.add_message("room1", "first message")
:ok
iex> Chat.Server.get_messages("room1")
["first message"]

Изменения — вот в этом коммите

Куда плыть дальше, капитан?

Мы с вами разобрались в куче сложных вопросов. Основные выводы:

  • Будьте осторожны, работая с pid напрямую: они поменяются как только процесс перезапустится.
  • Если вам нужно получить ссылку только на один процесс ( как у нас было с единственной комнатой в чате), регистрация процесса с именем в виде атома — достаточная мера;
  • Если вам нужно создавать процессы динамически (множество чат-комнат), вы можете использовать кортеж :via для предоставления своего собственного реестра;
  • Подобные реестры уже существуют ( к примеру gproc), и если вы их используете — не придётся строить свой велосипед;

Конечно, это ещё не всё. Если вам нужна глобальная регистрация на всех нодах в кластере, другие средства тоже могут быть хороши. У Erlang есть глобальные модули для глобальных регистраций, pg2 для групп процессов, да и тот же gprc может вам помочь.

Если эта статья вас заинтересовала — почитайте Saša Jurić. Elixir in Action.

А вот и репка с сыром)

Автор: Virviil

Источник

Поделиться новостью

* - обязательные к заполнению поля