Горизонтальное масштабирование websocket-ов на Ruby

в 5:24, , рубрики: ruby, sinatra, WebSocket, websocket server, Веб-разработка

Не так давно вышла статья, в которой автор описывал свой framework для написания приложений с использованием Ruby, Sinatra и websoсket. Но в том решении не был затронут вопрос горизонтального масштабирования. Так при подключении к одному из узлов, пользователи могут получать уведомления/данные только о событиях/изменениях, вызванных пользователями этого же узла, а при изменениях, внесенных через другой, они не узнают. Для решения данной задачи необходимо организовать общую шину данных. Рассматривать данную задачу буду в контексте обмена сообщениями клиент-клиент.

Шина данных

Требования, которые будем предъявлять к шине следующие:

  • простота работа;
  • передача в «реальном времени»;
  • производительность.

Организовать шину можно через хранилище с периодическим опросом, либо через сервер очередей.
Первый вариант не удовлетворяет второму условию, т.к. задержка в передаче будет ровна периоду опроса хранилища. Уменьшение периода приведет к росту нагрузки на него. Поэтому этот вариант отметаем сразу.

Второй вариант подходит лучше всего. В данном случае можно воспользоваться специализированными решениями на подобии RabbitMQ, ActiveMQ. Оба этих продукта представляют из себя серьезные решения, со множеством функций, хорошим масштабированием. Можно использовать и их, но нужно оценить, не будет ли это пушкой по воробьям. Кроме подобных решений функционал очередей предоставляет и Redis, в добавок получаем key-value хранилище, которое нам тоже понадобится.

Redis предоставляет простейший механизм Pub-Sub, которого достаточно для нашей задачи. Он достаточно быстр, прост в работе и имеет малые задержки при передаче.

Решение

Наша система будет иметь следующую схему.

Горизонтальное масштабирование websocket-ов на Ruby - 1

Сообщения между пользователями одного узла передаются напрямую, а сообщения между узлами через шину.
Для этого:

  1. узел генерирует уникальное имя;
  2. подписывается по нему на сообщения в Redis;
  3. все клиенты подключенные к этому узлу записывают пару ключ-значение в виде идентификатора клиента и идентификатора узла, к которому он подключен;
  4. при отправке сообщения другому клиенту, узнаем имя узла и передаем сообщения в его очередь для обработки.

А теперь реализуем

В качестве библиотеки для websocket выбран faye-websocket-ruby. Для работы с Redis стандартный гем redis (hiredis) + код примера для PubSub через EventMachine, так как реализация из гема работает в блокирующем режиме, а при работе в одном потоке с web-сервером это не допустимо.

module App
  class << self
    def configuration
      yield(config) if block_given?
      config.sessions = Metriks.counter('total_sessions')
      config.active = Metriks.counter('active_sessions')
    end    
    def config			
      @config ||= OpenStruct.new( redis: nil, root: nil )
    end	
    def id
      @instance_id ||= SecureRandom.hex
    end
    def logger
      @logger ||= Logger.new $stderr
    end

    def register
      config.redis.multi do
        config.redis.set "node_#{App.id}", true
        config.redis.expire "node_#{App.id}", 60*10
      end if config.redis

      EM.next_tick do        
        config.sub = PubSub.connect
        config.sub.subscribe App.id do |type, channel, message|
          case type
            when 'message'
              begin
                json = Oj.load(message, mode: :compat)
                WS::Base.remote_messsage json
              rescue => ex
                App.logger.error "ERROR: #{message.class} #{message} #{ex.to_s}"
              end
            else
              App.logger.debug "(#{type}) #{channel}:: #{message}"
          end
        end
        @pingpong = EM.add_periodic_timer(30) do
          App.config.redis.expire "node_#{App.id}", 60
        end
      end
    rescue
      config.redis = nil
    end
  end
end

Основная работа этого модуля заключается в методе register, который регистрирует себя на шине и ожидает входящие сообщения. Для мониторинга создается ключ вида node_%node_id% c TTL в 60 секунд и периодом обновления 30 секунд, на случай если узел отвалится. Таким образом можно всегда узнать сколько узлов сейчас находится в сети и их имена.

module WS
  class Base
    NEXT_RACK = [404, {}, []].freeze
    def self.call(*args)
      instance.call(*args)
    end
    def self.instance
      @instance ||= self.new
    end
    def self.remote_messsage(json)
      user = User.get json['from']
      instance.send :process, user, json if user
    rescue => ex
      user.error( { error: ex.to_s } )
    end
    def initialize
      @ws_cache = {}
    end
    def call(env)
      return NEXT_RACK unless Faye::WebSocket.websocket?(env)
      ws = Faye::WebSocket.new(env, ['xmpp'], ping: 5)
      user = User.register(ws)
      ws.onmessage = lambda do |event|
        json = Oj.load(event.data, mode: :compat)
        process(user, json )
      end
      ws.onclose = lambda do |event|
        App.logger.info [:close, event.code, event.reason]
        user.unregister
        user = nil
      end
      ws.rack_response
    rescue WS::User::NotUnique => ex
      ws.send Oj.dump({ action: :error, data: { error: 'not unique session' } })
      ws.close
      ws.rack_response
    end
    private

    def process(user, json)
      action = json['action'].to_s
      data = json['data']
      return App.logger.info([:message, 'Empty action']) if action.empty?
      return App.logger.info([:message, "Unknown action #{json['action']}"]) unless user.respond_to? "on_#{action}"
      user.send "on_#{action}", data
    rescue => ex
      user.error({ error: ex.to_s })
      puts ex.to_s
      puts ex.backtrace
    end
  end
end

Данный класс отвечает за установление соединения и обработку сообщений. В методе call создается новый клиент и вешаются обработчики. Метод класса remote_messsage используется для приема внешних сообщений (из шины). Метод process — единая точка для сообщений пришедших напрямую от клиента и для сообщений пришедших по шине.

Клиенты

module WS
  class User
    include UserBehavior
    attr_reader :id
    class Error < StandardError; end
    class RoomFull < Error; end
    class NotFound < Error
      attr_reader :id
      def initialize(id); @id = id end
      def to_s; "User '@#{id}' not found" end
    end
    class NotUnique < Error; end

    class  << self
      def cache
        @ws_cache ||= {}
      end

      def get(id)
        fail NotFound.new(id) if id.to_s.empty?
        @ws_cache.fetch(id)
      rescue KeyError
        WS::RemoteUser.new(id)
      end

      def register(ws)
        self.new(ws)
      end

      def unregister(ws)
        url = URI.parse(ws.url)
        id = url.path.split('/').last
        get(id).unregister
      end
    end

    def initialize(ws)
      @ws = ws
      register

      @pingpong = EM.add_periodic_timer(5) do
        @ws.ping('') do
          App.config.redis.expire @id, 15 if App.config.redis
        end
      end
    end

    def unregister
      on_close if respond_to? :on_close
      App.config.active.decrement
      App.config.redis.del @id if App.config.redis
      User.cache.delete(@id)
      @pingpong.cancel
      @pingpong = nil
      @ws = nil
      @id = nil
    end

    def send_client(from, action, data)
      return unless @ws
      data = Oj.dump({ from: from.id, action: action.to_s, data: data }, mode: :compat)
      @ws.send(data)
    end

    private
    def register
      url = URI.parse(@ws.url)
      @id = url.path.split('/').last
      if App.config.redis
        App.config.redis.multi do
          App.config.redis.set @id, App.id
          App.config.redis.expire @id, 15
        end
        App.config.sessions.increment
        App.config.active.increment
      end
      User.cache[@id] = self
      App.logger.info [:open, @ws.url, @ws.version, @ws.protocol]
      on_register if respond_to? :on_close
      self
    end
  end

  class RemoteUser
    include UserBehavior
    attr_reader :id
    attr_reader :node
    def initialize(id)
      @id = id.to_s
      fail WS::User::NotFound.new(id) if @id.empty?
      @node = App.config.redis.get(@id).to_s
      fail WS::User::NotFound.new(id) if @node.empty?
    end
    def send_client(from, action, data)
      return if node.to_s.empty?
      App.logger.info ['REMOTE', self.id, from.id, action]
      data = Oj.dump({ from: from.id, action: action.to_s, data: data }, mode: :compat)
      App.config.redis.publish node, data
    end
  end
end

Метод register регистрирует пользователя в хранилище, сопоставляя его ID с ID узла куда он подключен и кэширует его в локальном списке. Метод unregister напротив убирает все записи о клиенте и удаляет таймер. Таймер используется для периодической проверки состояние клиента и обновления TTL для его записи, чтобы в Redis не было мертвых душ.
ID клиента получается из URL по которому был запрос на подключение. Он имеет формат ws://%hostname%/ws/%user_id% где user_id случайно сгенерированная уникальаня последовательность.

Метод send_client отправляет данные уже самому клиенту.

Отдельное место занимает метод класса get. Данный метод возвращает по ID экземпляр класса WS::User либо если пользователь не найден в локальном кэше создает экземпляр класса WS::RemoteUser. При его создании проверяется есть ли такой ID в хранилище и какому узлу он принадлежит. Если ID не найдет кидается исключение.

Класс WS::RemoteUser в отличии от WS::User имеет только один метод send_client, который пересылает сформированные сообщения через шину на требуемый узел.

Таким образом, неважно где находится клиент вызов метода send_client доставит данные до адресата.

module UserBehavior
  module ClassMethods
    def register_action(action, params = {})
      return App.logger.info ['register_action', "Method #{action} already defined"] if respond_to? action

      block = lambda do |*args |
        if block_given?
          data, from = yield(self, *args)
          send_client from || self, action, data
        else
          send_client self, action, args.first
        end
      end

      define_method action, &block
      define_method "on_#{action}" do |data|
        self.send action, data
      end if params[:passthrough]

    end
  end

  def self.included(base)
    base.instance_exec do
      extend ClassMethods
      register_action :message do |user, from, text|
        [{ to: user.id, text: text }, from]
      end

      register_action :error, passthrough: true
    end
  end

  def on_message(data)
    App.logger.info ['MESSAGE', id, data.to_s]

    to_user_id = data['to']
    to_user = WS::User.get(to_user_id)
    to_user.message self, data['text']

  rescue WS::User::NotFound  => ex
    error({ error: ex.to_s })
  end
end

Обработка самих событий вынесена в отдельный модуль UserBehavior, который расширяет предыдущие два класса методами для реакции на сообщения. Каждое сообщение имеет поля FROM, ACTION и DATA. Первое идентифицирует от кого пришло, второе определяет метод, а третья сопутствующие данные. Так для ACTION со значением «message» будет вызван метод on_message, в который будет передано значение поля DATA.

Используя такой подход получилось реализовать прозрачную передачу сообщений между подключенными клиентами, при этом не важно находятся они на одном узле или на разных. Для тестирования запускал несколько экземпляров на разных портах, сообщения корректно отправлялись и получались.

Для желающих попробовать, код рабочего приложения выложил на github. Запускается просто, через rackup

PS

Данное решения не является законченным, думаю есть куда его улучшить и убрать лишнее, но как отправная точка вполне сгодится.

Автор: fuCtor

Источник

* - обязательные к заполнению поля