Работа с Aerospike на scala при помощи магии макросов

в 9:02, , рубрики: Aerospike, macros, nosql, scala, Блог компании Тинькофф Банк, метки:

Работа с Aerospike на scala при помощи магии макросов

N|Solid

В нашем отделе бигдаты часть данных хранится в Aerospike. Потребителей довольно много, среди них два приложения, написанных на Scala, взаимодействие с базой в которых будет расширено в связи с постоянно растущими требованиями бизнеса. Единственным приличным драйвером для нас был джавовый клиент, упомянутый на сайте самой базы данных aerospike.com (http://www.aearospike.com/docs/client/java). Конвертация скаловых типов данных (а особенно иерархических) в соответствующие аэроспайковские типы приводит к большому количеству бойлерплейта. Чтобы этого избежать, необходим более удобный, а заодно и типобезопасный интерфейс.

Инженеры не любят писать много раз один и тот же код и стараются упростить и оптимизировать все повторяющиеся действия. Такие задачи часто решает кодогенерация. Поэтому мы решили написать свою библиотеку для работы с Aerospike, пользуясь макросами.

Немного про Aerospike

Aerospike — это распределённая schema-less key-value база данных, работающая по принципу хэш-таблицы. Она активно используется в нашем банке для построения распределённых кэшей и для задач, требующих низкого времени отклика. База легко устанавливается и без проблем администрируется, что упрощает её внедрение и поддержку.

О модели хранения: параметры namespace и setName связаны с ключами записей, а сами данные хранят в так называемых бинах. Значения могут быть различных типов: Integers, Strings, Bytes, Doubles, Lists, Maps, Sorted Maps, GeoJSON. Интересно, что тип бина не является фиксированным и, записав, скажем, Integer, можно затем перезаписать его на любой другой. Драйверы, написанные для этой базы, обладают изрядным количеством кода для сериализации значений внешней модели во внутреннюю.

Про создание DSL

Рассмотрим на простых примерах процесс проектирования нашего DSL, почему мы решили использовать макросы, и что из этого всего получилось.

В условиях ограниченного времени (взаимодействие с этой базой только малая часть проекта) сложно написать целиком клиент с реализацией протокола. К тому же это потребовало бы больше усилий в поддержке. Поэтому мы остановились на создании обёртки для уже существующего клиента.

Рассмотрим на примерах.

В качестве базиса использован Aerospike Java Client версии 3.3.1 (его можно найти на сайте www.aerospike.com, исходники есть на Гитхабе), немалая часть методов в котором оперирует с ключами и бинами из пакета com.aerospike.client. Java Client поддерживает работу с базой как в синхронном, так и в асинхронном режиме. Мы используем асинхронный com.aerospike.client.async.AsyncClient. Самый простой способ его создать:

val client = new AsyncClient(new AsyncClientPolicy, hosts.map(new Host(_, port)): _*)

где hosts — это List[String], содержащий хосты вашей базы, а port — порт типа Int (по дефолту 3000).

Если при создании клиента передать невалидные значения хостов или неверный порт драйвер выкинет ошибку, потому что при вызове проверяет соединение:

scala> new AsyncClient(new AsyncClientPolicy, List().map(new Host(_, port)): _*)
com.aerospike.client.AerospikeException$Connection: Error Code 11: Failed to connect to host(s):

Таблица соответствий типов в DSL, Java CLient и базе данных
| Scala         | Java Client   | Aerospike     |
|-------------- |-------------- |-----------    |
| Int           | IntegerValue  | Integer       |
| Long          | LongValue     | Integer       |
| String        | StringValue   | String        |
| Boolean       | BooleanValue  | Integer       |
| Float         | FloatValue    | Double        |
| Double        | DoubleValue   | Double        |
| Seq           | ListValue     | List          |
| Map           | MapValue      | Map           |
| Char          | StringValue   | String        |
| Short         | IntegerValue  | Integer       |
| Byte          | IntegerValue  | Integer       |
| HList         | MapValue      | Map           |
| case class T  | MapValue      | Map           |

Из таблицы видно, что впереди довольно много однотипных преобразований. Писать все это руками каждый раз желания нет.

Первая мысль была про рефлекшен, но рантаймовый вариант нас не устраивает — это долго и дорого. Остается вариант с компайл-тайм рефлекшеном, который позволит сгенерировать конвертеры и получать сообщения об ошибках еще на стадии компиляции.

В методах интерфейса нашей DSL для любых действий с базой мы будем передавать только конкретные значения ключей и бинов, а все преобразования за нас сделают макросы. Основная идея была в том, чтобы избавиться от бойлерплейта и уберечь пользователя от досконального изучения внутренней структуры данных самого Aerospike. Мы предварительно описали наиболее оптимальный вариант хранения, опираясь на тип переданного для записи значения.

Рассмотрим на примере одной из самых распространенных операций с Aerospike — добавления записи с последующим ее чтением по ключу. Будем использовать метод Put. Для начала нам нужны функции преобразования значений определенных типов во внутренние модели драйвера: ключей в com.aerospike.client.Key, а бинов в com.aerospike.client.Bin.
Пусть ключ будет String, а записывать в различных сервисах будем бины типов String, Int, Boolean.

Напишем функцию преобразования ключа:

import com.aerospike.client.Key
def createStringKey(namespace: String, setName: String, value: String): Key =
   new Key(namespace, setName, new StringValue(value))

и бинов соответственно:

import com.aerospike.client.Value.{IntegerValue, StringValue, BooleanValue}

def createStringBin(name: String, value: String): Bin = new Bin(name, new StringValue(value))
def createIntBin(name: String, value: Int): Bin = new Bin(name, new IntegerValue(value))
def createBooleanBin(name: String, value: Boolean): Bin = new Bin(name, new BooleanValue(value))

Сигнатура нужного нам метода в библиотеке на java (вариантов несколько, мы берем с наименьшим количеством параметров):

public void put(WritePolicy policy, Key key, Bin... bins) throws AerospikeException;

Значит, вызовы с использованием этой библиотеки будут выглядеть так:

import com.aerospike.client.policy.WritePolicy

client.put(new WritePolicy, createStringKey("namespace", "setName", "keyValue1"),
   Seq(createStringBin("binName1", "binValue1"), createStringBin("binName2", "binValue2")): _*)
client.put(new WritePolicy, createStringKey("namespace", "setName", "keyValue2"),
   Seq(createIntBin("binName1", 2), createIntBin("binName2", 4)): _*)
client.put(new WritePolicy, createStringKey("namespace", "setName", "keyValue3"),
   Seq(createBooleanBin("binName1", true), createBooleanBin("binName2", false)): _*)

Не слишком симпатично, правда? Попробуем упростить:

 def createKey[T](ns: String, sn: String, value: T): Key = {
   val key = value match {
     case s: String => new StringValue(s)
     case i: Int => new IntegerValue(i)
     case b: Boolean => new BooleanValue(b)
     case _ => throw new Exception("Not implemented")
   }
   new Key(ns, sn, key)
 }

 def createBin[T](name: String, value: T): Bin = {
   value match {
     case s: String => new Bin(name, new StringValue(s))
     case i: Int => new Bin(name, new IntegerValue(i))
     case b: Boolean => new Bin(name, new BooleanValue(b))
     case _ => throw new Exception("Not implemented")
   }
 }

 def putValues[K, B](client: AsyncClient, namespace: String, setName: String,
                     keyValue: K, bins: Seq[(String, B)])(implicit wPolicy: WritePolicy): Unit = {
   client.put(wPolicy, createKey(namespace, setName, keyValue), bins.map(b => createBin(b._1, b._2)): _*)
 }

Теперь надо избавиться от функций createKey и createBin, добавим магии имплиситов.

Нам понадобятся служебные объекты, которые будут на основе типов входных данных генерировать соответствующие модели используемого драйвера:

KeyWrapper: [K => Key]
BinWrapper: [B => Bin]

Теперь можно собрать всю логику в один метод:

case class SingleBin[B](name: String, value: B)

def putValues[K, B](client: AsyncClient, key: K, value: SingleBin[B])(implicit kC: KeyWrapper[K],
 bC: BinWrapper[B], wPolicy: WritePolicy): Unit = client.put(wPolicy, kC(key), bC(value))

где WritePolicy — объект контейнер, содержащий различные параметры записи. Мы будем пользоваться дефолтным, создавая его так scala new WritePolicy.

Очевидно, что самым банальным вариантом будет описать создание врапперов всех типов. Но зачем это делать, когда мы знаем, как именно будет создаваться каждый из инстансов? Вот здесь нам пригодятся макросы.

Простейший вариант — описать создание того или иного типа конвертера при помощи квазиквот. Начнем с ключей:

 trait KeyWrapper[KT] {

   val namespace: String = ""
   val setName: String = ""

   def apply(k: KT): Key

   def toValue(v: KT): Value = v match {
     case b: Int => new IntegerValue(b)
     case b: String => new StringValue(b)
     case b: Boolean => new BooleanValue(b)
     case _ => throw new Exception("not implemented")
   }
 }

 object KeyWrapper {

   implicit def materialize[T](implicit dbc: DBCredentials): KeyWrapper[T] = macro impl[T]

   def impl[T: c.WeakTypeTag](c: Context)(dbc: c.Expr[DBCredentials]): c.Expr[KeyWrapper[T]] = {
     import c.universe._
     val tpe = weakTypeOf[T]

     val ns = reify(dbc.splice.namespace)
     val sn = reify(dbc.splice.setname)

     val imports =
       q"""
         import com.aerospike.client.{Key, Value}
         import collection.JavaConversions._
         import com.aerospike.client.Value._
         import scala.collection.immutable.Seq
         import ru.tinkoff.aerospikescala.domain.ByteSegment
         import scala.util.{Failure, Success, Try}
        """

     c.Expr[KeyWrapper[T]] {
       q"""
       $imports
       new KeyWrapper[$tpe] {
         override val namespace = $ns
         override val setName = $sn
         def apply(k: $tpe): Key = new Key(namespace, setName, toValue(k))
       }
      """
     }
   }
 }

где DBCredentials содержит namespace и setName.

Таким образом мы можем описать метод для сервиса, при компиляции которого будут самостоятельно генерироваться конвертеры.
N|Solid

С бинами у нас ситуация несколько сложнее. Необходимо доставать значения, сохраненные в базе, предварительно преобразованные во внутренний формат Aerospike. Для этого воспользуемся самым простым из методов драйвера:

public Record get(Policy policy, Key key) throws AerospikeException;

где возвращаемое значение:

public Record(
     Map<String,Object> bins,
     int generation,
     int expiration
  )

а необходимые нам данные лежат в Map<String,Object> bins. Тут возникает проблема (см. таблицу соответствий). Так как наша цель — генерировать конвертеры на этапе компиляции и обеспечить на выходе значение типа, идентичного записанному ранее, нам надо предсказать, как именно описать функцию, достающую нужное нам вэлью из базы. Помимо прочего типы, которые мы получаем в bins из пакета java.util — значит, нам пригодятся конвертеры из соответствующих пакетов scala.collection.
Теперь напишем конвертер для бинов:

trait BinWrapper[BT] {

 import com.aerospike.client.Value._
 import com.aerospike.client.{Bin, Record, Value}
 import scala.collection.JavaConversions._
 import scala.collection.immutable.Map
 import scala.reflect.runtime.universe._

 type Singleton = SingleBin[BT]
 type Out = (Map[String, Option[BT]], Int, Int)

 def apply(one: Singleton): Bin = {
   if (one.name.length > 14) throw new IllegalArgumentException("Current limit for bean name is 14 characters")
   else new Bin(one.name, toValue(one.value))
 }

 def toValue(v: BT): Value = v match {
   case b: Int => new IntegerValue(b)
   case b: String => new StringValue(b)
   case b: Boolean => new BooleanValue(b)
   case _ => throw new Exception("not implemented")
 }

 def apply(r: Record): Out = {
   val outValue: Map[String, Option[BT]] = {
     val jMap = r.bins.view collect {
       case (name, bt: Any) => name -> fetch(bt)
     }
     jMap.toMap
   }
   if (outValue.values.isEmpty && r.bins.nonEmpty) throw new ClassCastException(
     s"Failed to cast ${weakTypeOf[BT]}. Please, implement fetch function in BinWrapper")
   else (outValue, r.generation, r.expiration)
 }

 def fetch(any: Any): Option[BT]
}

Метод apply принимает в качестве параметра Record — тут обобщить можно всё до момента разбора непосредственно типа значения. Реализацию этого метода проще написать на макросах:

object BinWrapper {

 implicit def materialize[T]: BinWrapper[T] = macro materializeImpl[T]

 def materializeImpl[T: c.WeakTypeTag](c: blackbox.Context): c.Expr[BinWrapper[T]] = {
   import c.universe._
   val tpe = weakTypeOf[T]
   val singleton = weakTypeOf[SingleBin[T]]
   val out = weakTypeOf[(Map[String, Option[T]], Int, Int)]
   val tpeSt = q"${tpe.toString}"

   val fetchValue = tpe match {
     case t if t =:= weakTypeOf[String] => q"""override def fetch(any: Any): Option[$tpe] = any match {
       case v: String => Option(v)
       case oth => scala.util.Try(oth.toString).toOption
     } """
     case t if t =:= weakTypeOf[Boolean] => q"""override def fetch(any: Any): Option[$tpe] = any match {
       case v: java.lang.Long => Option(v == 1)
       case _ => None
     } """
     case t if t =:= weakTypeOf[Int] => q"""override def fetch(any: Any): Option[$tpe] = any match {
       case v: java.lang.Long => Option(v.toInt)
       case oth => scala.util.Try(oth.toString.toInt).toOption
     } """
     case t if t.toString.contains("HNil") || t.toString.contains("HList") =>
       q"""override def fetch(any: Any): Option[$tpe] = any match {
             case m: java.util.HashMap[Any, Any] =>
             val newList = castHListElements(m.asScala.values.toList, $tpeSt)
             newList.toHList[$tpe]
             case oth => None
           } """
     case _ => q""""""
   }

   val imports =
     q"""
        import java.util.{List => JList, Map => JMap}
        import com.aerospike.client.{Bin, Record, Value}
        import com.aerospike.client.Value.{BlobValue, ListValue, MapValue, ValueArray}
        import scala.collection.JavaConversions._
        import scala.collection.JavaConverters._
        import shapeless.{HList, _}
        import shapeless.HList.hlistOps
        import syntax.std.traversable._
        ....
      """

   c.Expr[BinWrapper[T]] {
     q"""
     $imports

     new BinWrapper[$tpe] {
       override def apply(one: $singleton): Bin = {
          if (one.name.length > 14) throw new IllegalArgumentException("Current limit for bean name is 14 characters")
          else new Bin(one.name, toValue(one.value))
        }
       override def apply(r: Record): $out = {
          val outValue: Map[String, Option[$tpe]] = {
          val jMap = r.bins.view collect {
           case (name, bt: Any) =>
           val res = fetch(bt)
           if (res.isEmpty && r.bins.nonEmpty) throwClassCast($tpeSt) else name -> res
          }
         jMap.toMap
         }

        (outValue, r.generation, r.expiration)
       }
       $fetchValue
     }

   """
   }
 }
}

Макросы сделали за нас всю работу — инстансы всех требуемых конвертеров будут генерироваться самостоятельно, вызовы методов будут содержать только сами значения ключей и бинов.
N|Solid

С Quasiquotes работать легко: поведение предсказуемое, подводных камней нет. Важно помнить, что при использовании такого подхода все библиотеки, которые нужны в описанных в Quasiquotes методах, должны быть импортированы в файл, где используется макрос. Поэтому я сразу добавила параметр imports в обоих конвертерах, чтобы не копировать множество библиотек в каждом файле.

Теперь у нас есть всё, чтобы написать сервис-обертку:

class SpikeImpl(client: IAsyncClient) {

 def putValue[K, B](key: K, value: SingleBin[B])(implicit kC: KeyWrapper[K], bC: BinWrapper[B]): Unit = {
   val wPolicy = new WritePolicy
   client.put(wPolicy, kC(key), bC(value))
 }

  def getByKey[K, B](k: K)(implicit kC: KeyWrapper[K], bC: BinWrapper[B]): Option[B] = {
   val policy = new Policy
   val record = client.get(policy, kC(k))
   bC.apply(record)._1.headOption.flatMap(_._2)
 }
}

Теперь можно проверить работу нашего сервиса:

import shapeless.{HList, _}
import shapeless.HList.hlistOps
import scala.reflect.macros.blackbox._
import scala.language.experimental.macros

object HelloAerospike extends App {

 val client = new AsyncClient(new AsyncClientPolicy, hosts.map(new Host(_, port)): _*)
 val database = new SpikeImpl(client)
 implicit val dbc = DBCredentials("namespace", "setName")

   database.putValue("key", SingleBin("binName", 123 :: "strValue" :: true :: HNil))
   val hlistBin = database.getByKey[String, Int :: String :: Boolean :: HNil]("key")
     .getOrElse(throw new Exception("Failed to get bin value"))
   println("hlistBin value = " + hlistBin)

}

Запускаем и заходим в базу:

Mac-mini-administrator-5:~ MarinaSigaeva$ ssh user@host
user@host's password:
Last login: Wed Nov 23 19:41:56 2016 from 1.1.1.1
[user@host ~]$ aql
Aerospike Query Client
Version 3.9.1.2
Copyright 2012-2016 Aerospike. All rights reserved.
aql> select * from namespace.setName
+------------------------------------------+
| binName                                  |
+------------------------------------------+
| MAP('{"0":123, "1":"strValue", "2":1}')  |
+------------------------------------------+
1 row in set (0.049 secs)
aql>

Данные записаны. Теперь посмотрим, что приложение вывело в консоль:

[info] Compiling 1 Scala source to /Users/Marina/Desktop/forks/playground/target/scala-2.11/classes...
[info] Running HelloAerospike
hlistBin value = 123 :: strValue :: true :: HNil
[success] Total time: 0 s, completed 23.11.2016 20:01:44

Для scala разработчиков решение может быть более понятным интуитивно, чем java библиотека. Код текущего DSL выложен на Гитхабе с подробным описанием how to и кукбуком, который будет дополняться.
В свете последних событий (scala 2.12 released) появилась задача для интересных экспериментов со scala-meta.
Надеюсь этот опыт будет вам полезен в решении подобных задач.

Автор: Тинькофф Банк

Источник

Поделиться

* - обязательные к заполнению поля