Паттерн передачи scala.concurrent.Promise в актор: особенности использования и альтернативы

в 12:14, , рубрики: actor, akka, akka-http, promise, scala, Программирование

В процессе поддержки различных проектов я несколько раз попадал в ситуацию, при которой по причине неправильной работы с Promise возникали проблемы на продакшне. Причём паттерн этой самой неправильной работы всегда был один и тот же, но скрывался он в разных обличьях. Более того, ошибочный код был написан различными людьми. К тому же, ни в одной статье по работе с Promise я толком не нашёл упоминание проблемы, которую хочу осветить. Так что предполагаю, что многие забывают про проблему, про которую я расскажу.

Интересно почитать много примеров асинхронного кода на Scala, с промисами, фьючами и акторами? Добро пожаловать под кат!

Немного о Future

Для начала, немного теории про Future в качестве вступления. Знакомые с этим типом из стандартной библиотеки Scala могут смело пропускать эту часть.

В Scala для представления асинхронных вычислений используется тип Future[T]. Пусть нам нужно вытащить из СУБД значение по ключу. Сигнатура синхронной функции для такого запроса выглядела бы примерно так:

trait SyncKVStore[K, V] {
  def get(key: K): V
}

Использовать её тогда можно было бы так:

val store: SyncKVStore[String, String] = ...
val value = store.get("1234") // value сразу типа String

У такого подхода есть недостаток: метод get() может быть блокирующим, причём ввиду возможной передачи данных по сети на относительно длительный срок. Попробуем сделать его неблокирующим, тогда трейт будет выглядеть так:

import scala.concurrent.Future

trait KVStore[K, V] {
  def get(key: K): Future[V]
}

Не исключено, что для корректного переписывания реализации нам понадобится воспользоваться асинхронным драйвером для используемой нами СУБД. Для примера же напишем in-memory реализацию на мапе:

import scala.concurrent.ExecutionContext

class DummyKVStore(implicit ec: ExecutionContext) extends KVStore[String, String] {

  // Future(...) вычислит переданное значение в другом потоке
  // для этого и нужен неявный ExecutionContext
  def get(key: String): Future[String] = Future(map(key))

  private val map = Map("1234" -> "42", "42 -> 13", "1a" -> "b3")
}

Воспользоваться полученным значением мы можем, например, следующим образом:

// в продакшн-коде лучше использовать кастомный ExecutionContext
import scala.concurrent.ExecutionContext.Implicits.global

val store = new DummyKVStore
// map также требуется неявный ExecutionContext
// лямбда вызовется только после успешного завершения фьючи
store.get("1234").map { value =>
  log.info(value) // для DummyKVStore выведет 42
}

У Future есть несколько полезных асинхронных методов обработки значения, опишем вкратце некоторые из них:

  • map(f: A => B) — по завершении фьючи преобразует успешный результат с помощью функции f
  • flatMap(f: A => Future[B]) — тоже преобразует успешный результат, но принимает, по сути, асинхронную функцию
  • foreach(f: A => Unit) — по успешному завершению выполняет функцию f, передав ей результат фьючи
  • onComplete(f: Try[A] => Unit) — то же, что foreach, но выполняет функцию по любому завершению, в т.ч. с ошибкой

Также стоит уточнить, что все эти методы принимают также параметр implicit ec: ExecutionContext, содержащий информацию о контексте выполнения фьюч, как можно догадаться по названию.

Более подробно про фьючи можно почитать, например, здесь.

Promise: ещё пара абзацев теории

Итак, что же такое Promise? Фактически, это типизированный write-once контейнер, содержащий в себе фьючу:

val p = Promise[Int]()
p.success(42)
p.future // вернёт Future с результатом 42

У него есть множество методов для того, чтобы фьюча завершилась, например:

  • success(t: T) — завершает фьючу с результатом t
  • failure(e: Throwable) — фэйлит фьючу с эксепшном e
  • complete(try: Try[T]) — завершает фьючу, если trySuccess; фэйлит, если Failure
  • completeWith(future: Future[T]) — завершает внутреннюю фьючу, когда завершается future

Таким образом, промисы можно использовать для создания фьюч.

Promise: погружаемся в практику использования

Представим себе, что мы хотим реализовать собственный асинхронный API поверх готового асинхронного Java API на коллбеках. Поскольку результат, который хотим вернуть во фьюче, доступен только в коллбеке, мы не сможем использовать напрямую метод Future.apply(). Здесь нам и поможет Promise. В этом ответе на SO есть, казалось бы, отличный пример использования Promise в реальном мире:

def makeHTTPCall(request: Request): Future[Response] = {
  val p = Promise[Response]()
  registerOnCompleteCallback { buffer =>
    val response = makeResponse(buffer)
    p.success(response)
  }
  p.future
}

Что ж, используем эту функцию в своём новом веб-сервисе, например, на Akka-HTTP. Для начала подключим в SBT зависимость:

libraryDependencies += "com.typesafe.akka" %% "akka-http" % "10.0.10"

И напишем код сервиса:

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.marshalling.GenericMarshallers.futureMarshaller

import scala.concurrent.ExecutionContext.Implicits.global

object WebService extends App {

  // нужен для создания и работы сервиса на Akka-HTTP
  implicit val system = ActorSystem()

  val store = new DummyKVStore

  val route =
    // обрабатываем запрос GET /value/$id
    (get & path("value" / IntNumber)) { id =>
      complete {
        for {
          value <- store.get(id)
          response <- makeHTTPCall(new RequestImpl(value))
        } yield response
      }
    }

  Http().bindAndHandle(route, "localhost", 80)
}

Примечание: метод complete() из Akka-HTTP умеет принимать Future[T], для этого импортируется futureMarshaller. Он ответит на HTTP-запрос после завершения фьючи.

А ещё мы решили зашедулить таск, который будет отсылать значение по некоторому ключу всем email-ам из нашей базы через некоторое HTTP API. Причём делает это в цикле: после завершения рассылки всем клиентам начинает делать это заново.

def allEmails: Seq[String] = ...

def sendEmails(implicit ec: ExecutionContext): Future[Unit] = {
  Future.sequence {
    for {
      email <- allEmails
    } yield for {
      value <- store.get("42")
      response <- makeHTTPCall(new SendMailRequest(email, value))
    } yield response
  }.flatMap(_ => sendEmails) // по завершении отправки всех писем запускает отправку ещё раз
}

Выложили всё это в продакшн. Однако через пару дней к нам приходят консьюмеры нашего API и жалуются на периодические отваливания сервиса по таймауту. А ещё через три дня мы обнаруживаем, что таск перестал рассылать почту! В чём же дело?

В логе видим такие стектрейсы:

some.package.SomeException
  at some.package.makeResponse(...)
  at some.package.$anonfun$makeHTTPCall$1(...)
  ...

Выходит, что метод makeResponse() кинул эксепшн. Глядя на исходники makeHTTPCall(), можно заметить, что в таком случае фьюча, которую он возвращает, никогда не завершается!

val p = Promise[Response]
registerOnCompleteCallback(buffer => {
  val response = makeResponse(buffer) // этот метод кидает эксепшн
  p.success(response)                 // тогда success() не вызывается
})
p.future

Именно поэтому наш API отваливался по таймауту, а цикл рассылки писем перестал работать. Увы, в Scala мы не можем программировать, не думая о том, что любая функция может вернуть эксепшн, как хотят многие...

Итак, вспоминаем, что метод Try.apply() умеет перехватывать эксепшны и возвращать Success со значением, либо Failure с брошенным исключением. Пофиксим тело лямбды наивным способом и отправим на код ревью:

import scala.util._

Try(makeResponse(buffer)) match {
  case Success(r) => p.success(r)
  case Failure(e) => p.failure(e)
}

Впрочем, на ревью нам подсказывают, что у промиса есть метод complete(), который сам делает то же, что мы написали руками:

p.complete(Try(makeResponse(buffer))

Итак, что мы узнали о Promise:

  1. Если в начале метода объявлен Promise, а в конце возвращается его фьюча, это совершенно не значит, что эта фьюча когда-либо завершится.
  2. Полезно было бы рассматривать Promise как ресурс, который необходимо всегда закрывать. Однако промис обычно объявляется и закрывается в разных потоках, поэтому использовать его как ресурс с помощью каких-либо стандартных конструкций языка (try-finally), или даже библиотек (scala-arm) проблематично.

Возможно, кто-то скажет, что это надуманный пример и в реальной жизни никто не забывает закрывать промисы? Что ж, для таких скептиков у меня есть ответ в виде нескольких реальных багов/PR-ов в Akka, связанных с некомплитящимися в определённых ситуациях промисами:

  1. QueueSource does not complete onComplete future on abrupt termination
  2. IgnoreSink doesn't complete mat future on abrupt termination
  3. Calling offer on completed queue's would return a never completable future

Кроме того, не всегда всё так просто и очевидно, как в этом примере.

Последний кусочек теории: небольшое введение в акторы

Знакомые с акторами могут пропустить эту часть.

Нам понадобится немного знаний об акторах Akka в дальнейшем в этой статье. Подключим модуль akka-actor в наш проект, пример для SBT:

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.7"

Актор в Akka — это объект с некоторым поведением, асинхронно получающий сообщения. По дефолту поведение определяется в методе receive:

import akka.actor._
import akka.actor.Actor.Receive

val log: Logger = ourLogObject()

case object HelloRequest

class HelloActor extends Actor {

  // Receive -- алиас для частичных функций
  def receive: Receive = {
    // когда получаем сообщение HelloRequest, пишем в лог "hello!"
    case HelloRequest =>
      log.info("hello!")
  }
}

// создали систему для работы с акторами
val system = ActorSystem()
// создали в этой системе наш актор
val actor: ActorRef = system.actorOf(Props(new HelloActor))
// асинхронно выведет "hello!" в лог
actor ! HelloRequest

После создания актор доступен не напрямую, а через прокси под названием ActorRef. Через этот прокси можно асинхронно отправлять сообщения с помощью метода ! (алиас для tell), и эти сообщения будут обработаны методом, определяющим поведение актора. Сообщения должны быть сериализуемыми, поэтому часто для сообщений создают case object (при отсутствии параметров у сообщения) или case class. Актор может одновременно обрабатывать не больше одного сообщения, поэтому его можно использовать в том числе как примитив синхронизации.

Есть ещё один важный момент: актор может менять свою функцию поведения, то есть фактически, обработки сообщений. Для этого в акторе нужно вызвать метод context.become(newReceive), где newReceive — параметр типа Receive. После этого, начиная со следующего сообщения, начнётся обработка функцией newReceive вместо дефолтного receive.

Соединяем части паттерна: передаём промис в актор

Итак, перейдём к следующему примеру.

Пусть нам нужно написать клиента для какого-нибудь сервиса. Например, для букинга. Допустим, мы хотим уметь получать информацию об отеле по id.

case class Hotel(id: Long, name: String, country: String) // весьма упрощённая модель

trait BookingClient {

  def getHotel(id: Long): Future[Hotel]
}

Теперь нужно определить метод, который обратится к API букинга и обработает ответ. Для этого мы воспользуемся асинхронным HTTP-клиентом библиотеки Akka-HTTP. Подключим его зависимостью:

libraryDependencies ++= "com.typesafe.akka" %% "akka-http" % "10.0.10"

Наш метод хотят запускать с довольно большим RPS в течение недолгого времени, а время ответа не очень важно. У клиента Akka-HTTP есть особенность: он не даёт запускать в параллель больше, чем akka.http.host-connection-pool.max-connections запросов. Воспользуемся крайне простым решением: сделаем так, чтобы все запросы шли через актор, то есть в один поток (фактическое решение было немного сложнее, но это неважно для нашего примера). Поскольку мы хотим возвращать фьючу, создадим промис и передадим его в актор, а уже в акторе закомплитим его.

// implicit system и materializer для выполнения запроса через Akka-HTTP; ec для выполнения Future.foreach()
class HttpBookingClient(baseUri: String)(implicit system: ActorSystem, materializer: ActorMaterializer, ec: ExecutionContext) {

  override def getHotel(id: Long): Future[Hotel] = {
    val p = Promise[Hotel]()
    actor ! Request(id, p)
    p.future
  } 

  private val actor = system.actorOf(Props(new ClientActor))

  private case class Request(id: Long, p: Promise[Hotel])

  private case object Completed

  private class ClientActor extends Actor {

    // дефолтный обработчик сообщений, используется при ожидании нового запроса
    override def receive: Receive = {
      case Request(id, p) =>
        val uri = Uri(baseUri).withQuery(Query("id" -> id.toString))
        val eventual = Http().singleRequest(HttpRequest(uri = uri)) // метод вернёт фьючу с информацией об ответе API
        eventual.foreach { response =>
          p.completeWith {
            val unmarshalled = Unmarshal(response.entity)
            // возвращаем найденный отель при коде ответа 200, иначе эксепшн с телом ответа
            response.status match {
              case StatusCodes.OK =>
                // нужно импортировать Unmarshaller[Hotel], например, с помощью akka-http-spray-json
                // для простоты в статье мы это делать не будем
                unmarshalled.to[Hotel]
              case _ =>
                unmarshalled.to[String].flatMap(error => Future.failed(new Exception(error)))
            }
          })
        }
        p.future.onComplete(_ => self ! Completed)
        // меняем обработчик следующих сообщений на running
        context.become(running)
    }

    // обработчик сообщений, используется при ожидании завершения текущего запроса
    // новые запросы откладываются на потом
    private def running: Receive = {
      case request: Request =>
        // имеет смысл переотправлять себе запрос по таймауту, а не сразу; сделано для простоты
        self ! request
      case Completed =>
        // снова меняем обработчик обратно на дефолтный
        context.become(receive)
    }
  }
}

И снова после выкладывания либы всё поначалу шло хорошо, но затем нам прилетел баг-репорт с заголовком "Метод getHotel() возвращает незавершающиеся фьючи". Почему же это произошло? Выглядит, что мы всё предусмотрели, использовали метод completeWith() на всё тело лямбды… Тем не менее, при некоторых условиях фьюча всё же залипает.

Всё дело в том, что лямбда, переданная методу foreach(), запустится только при успешном завершении фьючи eventual. Таким образом, если эта фьюча сфейлилась (например, отвалилась сетка), промис никогда не закомплитится!

Можно предположить, что фикс относительно тривиален: вместо foreach() нужно использовать onComplete(), и в переданной ему лямбде обработать ошибку. Примерно так:

eventual.onComplete {
  case Success(response) => // тот же самый код, что и ранее внутри foreach...
  case Failure(e) => p.failure(e)
}

Это решит проблему с залипанием фьючи конкретно по причине сфейлившегося eventual, но не решает всех возможных проблем с залипанием фьючи, переданной таким образом.

Для простоты дальнейших рассуждений реализуем более простой и вместе с тем более общий пример с передачей промиса в актор для завершения фьючи:

case class Request(arg: String, p: Promise[String])

trait GimmeActor {

  val actor: ActorRef

  def function(arg: String): Future[String] = {
    val p = Promise[String]()
    actor ! Request(arg, p)
    p.future
  }
}

Кстати говоря, конструкция вида тела funcion() нередко встречается, например, в исходном коде Akka и других библиотек. В той же Akka можно найти несколько десятков использований Promise, написанных по такому паттерну:

  1. Создать Promise
  2. Передать его одним из аргументов асинхронной функции (причём в исходниках Akka это часто как раз вызов actor.tell())
  3. Вернуть поле future созданного Promise.

Пара примеров для наглядности:

  1. Здесь в коллбеке на L129 как раз и происходит передача p в актор.
  2. А здесь промис отправляется прямо в параметрах создания актора.

При использовании этого паттерна есть по меньшей мере пара проблем.

  • Два примера классов:

class DoesntEvenKnowAboutRequest extends Actor {

  def receive: Receive = {
    case PoisonPill =>
  }
}

class HandlesRequestWrongWay extends Actor {

  def receive: Receive = {
    case Request(arg, p) =>
  }
}

В обоих случаях промис, очевидно, не будет завершён.

С одной стороны, можно сказать, что эти примеры излишне синтетические. С другой, компилятор никак не защищает нас от таких ошибок. Кроме того, про эту проблему нужно помнить не только для акторов, но и для передачи промисов в обычные асинхронные функции.

  • Пусть реализация актора написана верно и он всегда завершает промис, когда получает Request:

class AlwaysCompletesRequest extends Actor {

  def receive: Receive = {
    case Request(arg, p) =>
      p.success("42")
}

У нас всё хорошо, наши сервисы, использующие этот актор, отлично работают. Впрочем, мы решили в начале запустить сервис на одном сервере.

Но вот, компания растёт, число пользователей тоже, а вместе с ним и количество запросов к нашему актору. На пиках нагрузки актор перестал успевать за необходимое время справляться с потоком сообщений и мы решили заняться горизонтальным масштабированием: запускать AlwaysCompletesRequest на разных нодах в кластере. Для организации кластера нужно использовать akka-cluster, однако в статье для простоты не будем организовывать кластер, а просто обратимся к одному удалённому актору AlwaysCompletesRequest.

Нам нужно создать ActorSystem с поддержкой akka-remote на обеих JVM: обращающейся к актору и хостящей его. Для этого добавим в application.conf обоих сервисов такую конфигурацию:

akka {
  actor {
    provider = remote
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "some-hostname"
      port = 2552 # выберем такой порт для сервера; для клиента может быть и какой-то другой
    }
  }
}

Также необходимо добавить зависимость от akka-remote для обоих сервисов, пример для SBT:

libraryDependencies += "com.typesafe.akka" %% "akka-remote" % "2.5.7"

Теперь создадим актор на сервере:

val system = ActorSystem("server")
system.actorOf(Props(new AlwaysCompletesRequest), "always-complete-actor")

Затем получим его на клиенте:

val system = ActorSystem("client")
val actor = system.actorSelection("akka.tcp://server@some-hostname:2552/user/promise-actor")
val gimme = new GimmeActor {
  override val actor: ActorRef = actor
}
val future = gimme.function("give me 42, please!")

И запустили мы сервер вместе с клиентом… И сразу же получили залипшую фьючу!

В логах видим следующее исключение:

akka.remote.MessageSerializer$SerializationException: Failed to serialize remote message [class akka.actor.ActorSelectionMessage] using serializer [class akka.remote.serialization.MessageContainerSerializer].
Caused by: java.io.NotSerializableException: scala.concurrent.impl.CallbackRunnable

Таким образом, при попытке отправить промис удалённому актору мы вполне предсказуемо получили ошибку сериализации промиса. На самом деле, даже если бы мы и смогли сериализовать и передать промис, закомплитился бы он только на удалённой JVM, а в нашей JVM так и остался бы залипшим. Таким образом, передача промиса в актор работает только при локальной передаче сообщений, то есть данный паттерн отправки промиса в актор плохо масштабируется.

Варианты решения проблем паттерна

Завершение по таймауту

Как самый очевидный вариант решения проблемы — фейлить промис по таймауту. Посредством Akka можно сделать это, например, следующим образом:

// для .seconds
import scala.concurrent.duration._

val system: ActorSystem = ...
val timeout = 2.seconds // или любой другой резонный таймаут в конкретном случае
system.scheduler.scheduleOnce(timeout)(p.tryFailure(new SomeTimeoutException))

Методы Promise, которые мы рассматривали ранее, завершаются успешно, только если Promise не был завершён ранее. Если же эти методы вызываются уже после завершения промиса, они кидают IllegalStateException. Для случаев, когда необходимо попытаться завершить промис, когда он, вероятно, уже был завершён, есть методы, аналогичные рассмотренным, но с префиксом try в названии. Они возвращают true, если завершили промис самостоятельно; false, если промис был завершён раньше вызова этого метода.

Также, как вариант, можно фейлить фьючу по таймауту прямо внутри актора:

class AlwaysAlwaysCompletesRequest extends Actor {

  def receive: Receive = {
    case Request(str, p) =>
      p.completeWith(someFuture())
      context.scheduler.scheduleOnce(timeout)(self ! Timeout(p))
    case Timeout(p) =>
      p.tryFailure(new SomeTimeoutException)
  }
}

Разумеется, этот вариант никак не решает проблему масштабируемости.

ask-паттерн

Можно было, конечно, сделать и иначе. Например, воспользоваться ask-паттерном, который требует передачу таймаута:

import akka.pattern.ask
import akka.util.Timeout

case class Request(arg: String) // больше не нужно передавать промис

def function(arg: String): Future[String] = {
  implicit val timeout = Timeout(2.seconds)
  val any: Future[Any] = actor ? Request(arg) // timeout передаётся неявно в ?
  any.mapTo[String]
}

В таком случае и реализация актора должна быть несколько иной: вместо завершения промиса нужно ответить на сообщение:

class AlwaysCompletesRequest extends Actor {

  def receive: Receive = {
    case Request(arg) =>
      // отвечаем отправителю сообщения: с этим результатом фьюча и завершится
      sender() ! "42"
}

Впрочем, лёгкость реализации таит за собой опасности.

  1. Теперь фьюча, "выходящая" из актора, не является типобезопасной. Компилятор сможет поймать нас за руку, если мы попытаемся закрыть промис значением неверного типа. Однако он не защитит нас в случае преобразования типа из Any.
  2. Этот аргумент может показаться несколько субъективным, однако не могу с ним не согласиться. Часто считается, что дизайн с использованием tell обычно легче в понимании для модели акторов.
  3. ask "под капотом" создаёт актор, который должен быть доступен по akka-remote, а также создаёт промис, а также некий "мост" между ним и ActorRef (впрочем, если всё равно нужен remote, то это не проблема). По этой причине в документации Akka рекомендуется предпочитать tell для производительности, и использовать ask, только если это крайне необходимо.

Использовать Akka Typed

Как было сказано ранее, ask-паттерн требует передачу таймаута, благодаря чему фьюча не зависнет, даже если актор никогда не ответит на сообщение. Однако классические акторы Akka при использовании ask-паттерна возвращают Future[Any]. Разумеется, это приводит к необходимости небезопасного приведения типа в рантайме, как можно было заметить в предыдущем примере:

(actor ? Request(arg)).mapTo[String]

Таким образом, если актор ответит сообщением типа, отличного от String, фьюча сфейлится. В качестве экспериментального решения данной проблемы можно использовать экспериментальный же модуль Akka Typed. Подключим его:

libraryDependencies += "com.typesafe.akka" %% "akka-typed" % "2.5.7"

Теперь напишем ещё один актор, на сей раз типизированный:

import akka.typed._
import akka.typed.scaladsl.Actor
import akka.typed.scaladsl.AskPattern._
import akka.util.Timeout

import scala.concurrent.Future
import scala.concurrent.duration._

case class Request(arg: String, replyTo: ActorRef[String])

// описываем поведение актора
val typeSafeActor = Actor.immutable[Request] { (_, msg) =>
  msg.replyTo ! "42"
  // возвращаем новое поведение актора: в данном случае оно остаётся тем же
  Actor.same
}

val system: ActorSystem[Request] = ActorSystem(typeSafeActor, "type-safe-actor")

def function(arg: String): Future[String] = {
  implicit val timeout: Timeout = Timeout(2.seconds)
  system ? (Request(arg, _))
}

Теперь наша фьюча типобезопасна. Впрочем, соображения дизайна и перформанса остаются в силе, как и в пункте 2. Кроме того, мы можем и не хотеть использовать в продакшн-коде API с пометкой "может измениться".

Переписать всё, в т.ч. код клиентов, на акторы

В случае использования рассматриваемого паттерна с акторами проблема возникает, фактически, из-за того, что мы должны обращаться к актору извне ActorSystem. Можно сделать примерно такое изменение:

case class Request(id: Long)

case class Response(hotel: Hotel)

// актор юзера нашего API
class CallingActor(actor: ActorRef) extends Actor {

  def receive: Receive = {
    case response: String =>
      doSomethingWithActorResponse(response)
    case request: Request => 
      actor ! request
      // также можно зашедулить отмену ожидания ответа по таймауту
  }
}

// наш актор
class AlwaysCompletesRequest extends Actor {

  def receive: Receive = {
    case Request(arg) =>
      // отвечаем отправителю сообщения
      sender() ! "42"
}

При прямом взаимодействии акторов друг с другом необязательно заворачивать ответ в промис, а можно просто вернуть его в ответном сообщении. Однако для консьюмеров API часто удобнее просто получить фьючу, чем разбираться с протоколом взаимодействия с нашим актором.

Оставить всё, как есть

В исходном коде Akka этот паттерн часто используется, что в некоторой степени показывает его жизнеспособность и удобность. Если мы осознанно используем его, следует помнить о рекомендации, которую желательно применять всегда, но особенно важно в данной ситуации: близкое к 100%-ному, а ещё лучше 100%-ное покрытие кода, связанного с промисами, обязательно. Компилятор не может сообщить нам о незакрытом ресурсе, поэтому придётся весьма агрессивно тестировать, что мы всегда сами закрываем его руками. Кроме того, необходимо осознавать, что мы не сможем передавать промис актору по сети, что может привести к проблемам при масштабировании.

Если мы всё же решаем использовать рассматриваемый паттерн, в таком случае бывает полезно выделять часть кода, в которой мы с высокой долей вероятности уверены, что она вернёт завершающуюся фьючу. Например, для создания фьючи используются только сторонние библиотеки с малой вероятностью содержания багов в месте вызовов. Для примера отрефакторим код актора, обращающегося к букингу:

override def receive: Receive = {
  case Request(id, p) =>
    p.completeWith(doRequest(id))
    p.future.onComplete(self ! Completed)
    context.become(running)
}

def doRequest(id: Long): Future[Hotel] = {
  val uri = Uri(baseUri).withQuery(Query("id" -> id.toString))
  val eventual = Http().singleRequest(HttpRequest(Uri(uri = uri)))
  eventual.flatMap { response =>
    val unmarshalled = Unmarshal(response.entity)
    response.code match {
      case StatusCodes.OK =>
        unmarshalled.to[Hotel]
      case _ =>
        Future.failed(new Exception(unmarshalled.to[String]))
    }
  }
}

Этот код имеет следующие преимущества относительного старого:

  1. код работы с промисом стал тривиален;
  2. для обработки ответа от API мы теперь используем более функциональный flatMap, а не более императивный onComplete;
  3. теперь наш букинг-клиент возвращает некомплитящуюся фьючу только в случае бага в Akka-HTTP.

Этот подход часто применим при передаче промиса в другие асинхронные функции, а не только в actor.tell().

Теперь сравним рассмотренные методы по нескольким критериям:

  • Масштабируемость — возможность использовать метод на нескольких нодах также, как в рамках одной JVM.
  • ask/tell — паттерн Akka, использующийся под капотом для передачи сообщения.
  • Типобезопасность:
    + — все элементы метода типизированы;
    ± — используются стандартные акторы, которые, строго говоря, не типизированы; однако сама фьюча всё же типизирована;
    - — используются стандартные акторы и даже фьюча не типизирована.
  • Отсутствие зависаний — всегда ли фьюча завершается (по таймауту).
  • Стабильность подхода — стабильность используемого методом API, минус стоит только для экспериментального typed ask.
  • Удобство API — для потребления консьюмерами: как правило, просто обработать фьючу удобнее, нежели вникать в протокол взаимодействия с актором.

таймаут ask typed ask всё на акторах promise
масштабируемость - + + + -
ask/tell tell ask ask tell tell
типобезопасность ± - + ± ±
отсутствие зависаний + + + + -
стабильность подхода + + - + +
удобство API + + + - +

Кроме того, хотелось бы уточнить, что метод с таймаутом относительно метода без него имеет также недостаток в виде немного усложнённого кода.

В заключение хочется сказать, что хотелось бы иметь некую конструкцию, которая будет работать с промисами, переданными в актор, как ресурсами. Впрочем, с этим есть множество проблем, например:

  1. не всегда на этапе компиляции известен тип актора, в который мы передаём сообщение;
  2. актор может передавать промис в другой актор или комплитить его фьючей, которая, в свою очередь, не завершится;
  3. создание и завершение промиса обычно происходит в разных участках кода и разных потоках.

Возможно, было бы реально хотя бы в некоторых случаях выявлять баги посредством статического анализа кода.

Если вам что-либо известно о таких или иных способах борьбы с проблемами паттерна, расскажите о них в комментариях!

Автор: Евгений Веретенников

Источник

Поделиться

* - обязательные к заполнению поля