EventMachine ⇒ сбор информации из разных источников с последующей обработкой

в 5:50, , рубрики: async, eventmachine, ruby, метки: ,

EventMachine ⇒ сбор информации из разных источников с последующей обработкой
Самый простой способ наступить на грабли — использовать асинхронность. Я знаком с программистами, зарекомендовавшими себя как крепкие профессионалы, которые буквально пасовали перед многопоточностью. Для затравки расскажу мою любимую историю про deadlock (прошу прощения за боян, но уж больно хорош). Лет десять назад Associated Press поведало миру, как в аэропорту шведского города Крисианстад пилот пытался посадить пассажирский самолет, однако никто из диспетчеров не ответил на его запрос. Оказалось, что диспетчер еще не вернулся из отпуска. В итоге самолет кружил над аэропортом, пока срочно не вызвали запасного диспетчера, который и
посадил самолет через полчаса. Разбор полетов показал, что причиной стало опоздание самолета. На борту которого и находился тот самый диспетчер, спешивший на работу из отпуска.

Итак, когда мы сталкиваемся с асинхронностью, нам приходится ломать привычную картину в голове: субъективно окружающий нас мир однопоточен. Если мы послали письмо, а через неделю получили ответ, для нас все происходит в пределах одного потока; нам не приходится отвечать за действия респондента и почтальона. А нашему коду — приходится.
Чтобы упростить жизнь программиста, можно использовать паттерн Реактор. Лучшая (на мой взгляд) его имплементация для руби — EventMachine. Но и с ней бывают не очевидные моменты. Об одном из них я и планирую вкратце рассказать.

EventMachine

gem install eventmachine

Класс EventMachine более-менее документирован и разобраться с простыми запросами не составит труда. Обычно все происходит как-то так (EM — алиас для EventMachine):


begin
  EM.run do
  … # тут-то все и происходит, например, EM.connect (…)
  # бесконечно печатать всякую фигню
  EM.add_periodic_timer(1) { puts "on tick" } 
  end
ensure
  EM.stop # это тут просто для примера, деструктор остановит все сам
 end

На остановку реактора можно повесить хук (EventMachine.add_shutdown_hook { puts «Exiting…» }). Ну и, разумеется, можно на лету создавать асинхронные соединения. Документация, повторюсь, есть. Местами даже внятная.
Но хватит занудства.

Сбор результатов

Пока все ограничивается моделью «запрос → обработка ответа», никаких проблем нет. Но что делать, если нам необходимо отослать следующий запрос в зависимости от результата предыдущего? Чтобы не делать заметку слишком длинной и не разжевывать совсем простые моменты, сразу перейду к задаче:

отыскать на жаббир-сервере через дискавери нужный нам компонент и пообщаться именно с ним

На словах это происходит как-то так: отсылаем запрос к дискавери, получаем список компонентов, каждый компонент опрашиваем на предмет его возможностей. Если в списке features есть и наша, проводим инициализацию.
Вот как это выглядит с использованием EventMachine (я убрал все, что не касается непосредственно работы с EM):


@stream.write_with_handler(disco) do |result|
…
  # iterate thru disco results and wait until all collected
  EM::Iterator.new(result.items).map proc{ |c, it_disco|
…
    @stream.write_with_handler(info) do |reply|
…
      # iterate thru discoinfo results and wait until all collected,
      # then switch the parent’s iterator
      EM::Iterator.new(reply.features).map proc{ |f, it_info|
        it_info.return …
      }, proc{ |comps|
        # one more disco item was utilized
        it_disco.return …
      }
…             
    end
…
  }, proc{ |compss|
    # yielding
    # compss.init or smth like that
…
  }
end

Всю работу за нас выполнят итераторы и их волшебная функция map. Код лямбды под последними скобками (комментарий «yielding») выполнится только тогда, когда будут собраны все discoinfo для каждого найденного компонента.

Прошу прощения, если кому-то это покажется очевидным, но гугл мне не подсказал быстрого решения, а возня с Fiber’ами здесь превращается в чистый ад.

Автор: matiouchkine

Источник

Поделиться

* - обязательные к заполнению поля