Реализация yield в Kotlin

в 17:56, , рубрики: kotlin, yield, Программирование, метки: ,

Недавний пост о выходе Kotin M5 вызвал у меня желание немного поиграться с языком (оно появлялось и раньше, но руки никак не доходили). Писать классические hello world'ы мне было неинтересно, я и решил придумать какую-нибудь заковыристую задачку, которая еще позволит использовать различные интересные фишки языка.
Напомню, что Kotlin — статически-типизированный язык программирования, который может компилироваться в байткод JVM или в JavaScript. Разрабатывается компанией Jetbrains.

Котлин имеет множество различных синтаксических сладостей, что позволяет делать довольно интересные вещи. В официальной документации есть пример(см. Higher-order functions) реализации конструкции synchronized в виде обычной функции, причем ровно так, как она выглядит в Java.
У меня родилась мысль реализовать подобным образом yield.

Yield

Многие C#-разработчики знают и используют конструкцию yield.
Yield — это пример реализации сопрограмм. Сопрограмма является обобщением понятия процедуры и поддерживает множество точек входа. В случае с языком C# сопрограммы, написанные с применением yield, позволяют генерировать элементы IEnumerable на лету:

static IEnumerable<int> CountToTen()
{
  for (int i = 0; i <= 10; ++i)
  {
    yield return i;
  }
}

С помощью yield можно написать бесконечный список:

IEnumerable<int> Fibonacci()
{
  int a = 0, b = 1;
  while (true)
  {
    int sum = a + b;        
    yield return sum;
    a = b;
    b = sum;
  }
}
//---
foreach (var i in Fibonacci().Take(10))
{
  Console.WriteLine(i);
}

В Шарпе yield реализован на уровне компилятора: из тела yield-метода генерируется Enumerator, представляющий собой state-машину. Более подробно можно почитать тут

И, хотя для Java есть реализации yield, работающие за счет модификации байт-кода и генерирущие подобные state-машины, такой подход весьма сложный и требует дополнительных действий с кодом: дополнительная стадия компиляции или использование кастомных classloader'ов.
Для решения нашей задачки мы пойдем другим путем — будем использовать дополнительный поток.

Реализация

Итак, идея достаточно прозрачна — нужно реализовать схему Producer-Consumer с использованием дополнительно потока, который будет производить непосредственное вычисление следующего значения последовательности по требованию основого потока.

Как и в случае с synchronized, наши yield-генераторы будут создаваться с помощью внешней функции (Котлин поддерживает функции вне классов), которая будет принимать в качестве аргумента еще одну функцию, и возвращать Iterable<T>. Назовем ее yieldfun. Пользовательский код будет выглядеть примерно следующим образом:

fun Test() = yieldfun<Int> { (context) -> 
    for (val i in 0 .. 3)
        context.yield(i); 
}

context — некий интерфейс, который будет предоставлять пользовательскому коду операцию yield.

Начнем с реализации самой yieldfun, ее код достаточно прост:

fun yieldfun<T> (body : (YieldContext<T>) -> Unit) : Iterable<T> {
  return YieldIterable<T>(body)
}

Тело функции представлят собой создание объекта YieldIterable<T> с передачей в него аргумента body — пользовательской лямбда-функции. Однако, в этом случае Котлин позволяет записать такую функцию еще короче:

fun yieldfun<T> (body : (YieldContext<T>) -> Unit) : Iterable<T> = YieldIterable<T>(body)

Интерфейс Iterable<T> имеет единственный метод, возвращающий Iterator<T>, поэтому код YieldIterable<T> тоже не содержит ничего интересного:

private class YieldIterable<T>(val body: (YieldContext<T>) -> Unit): Iterable<T>{
    public override fun iterator(): Iterator<T> = YieldIterator<T>(body)
}

Входным параметром функции body является интерфейс(точнее trait) YieldContext<T>, который объявлен следующим образом:

trait YieldContext<T> {
    fun yield(value : T)
}

Как было сказано выше, он содержит метод yield(T) для проталкивания сгенерированных значений из юзерского кода.

Теперь перейдем к ядру нашего yield — классу YieldIterator<T>:

private class YieldIterator<T> (val body: (YieldContext<T>) -> Unit): Iterator<T>, YieldContext<T> {
    private var thread: Thread? = null
    private val resultQueue = SynchronousQueue<Message>()
    private val continuationSync = SynchronousQueue<Any>()
    private var currentMessage: Message? = null

    {
        val r = object : Runnable {
            public override fun run() {
                try {
                    continuationSync.take()
                    body(this@YieldIterator)
                    resultQueue.put(CompletedMessage())
                }
                catch (e: Throwable) {
                    resultQueue.put(ExceptionThrownMessage(e))
                }
            }
        }
        thread = Thread(r)
        thread!!.start()
    }

    override fun yield(value: T) {
        resultQueue.put(ValueMessage(value))
        continuationSync.take()
    }

    public override fun next(): T {
        evaluateNext()
        if (currentMessage is ExceptionThrownMessage)
            throw (currentMessage as ExceptionThrownMessage).exception
        if (currentMessage !is ValueMessage)
            throw NoSuchElementException()
        val value = (currentMessage as ValueMessage).value as T
        currentMessage = null
        return value
    }
    public override fun hasNext(): Boolean {
        evaluateNext()
        if (currentMessage is ExceptionThrownMessage)
            throw (currentMessage as ExceptionThrownMessage).exception
        return currentMessage is ValueMessage
    }

    private val dummy = Any()

    private inline fun evaluateNext() {
        if (currentMessage == null) {
            continuationSync.put(dummy)
            currentMessage = resultQueue.take()
        }
    }
}

Итак, посмотрим, что здесь происходит:
в классе объявлен поток и две синхронные очереди, которые служат для взаимодействия между основным и рабочим потоком. Напомню, что синхронная очередь — разновидность блокирующей очереди, в которой оба метода put и take не будут заблокированы, только если другой поток уже ждет выполнения парной операции (подробнее тут).

Первая очередь resultQueue служит для проталкивания очередного результата вычисления в рабочем потоке thread в основной поток, а вторая — continuationSync — для сигнала о том, что рабочему потоку можно вычислять следующий элемент.

В конструкторе происходит запуск рабочего потока, который сразу же блокируется на очереди continuationSync. Далее происходит следующее:
1) когда пользовательский код из основного потока вызывает next() или hasNext() у нашего итератора, то этот поток сообщает рабочему, чтобы он разблокировался (см. метод evaluateNext()), а сам блокируется в ожидании результата.
2) рабочий поток начинает выполнение лямбды body, т.е. юзерского кода.
3) юзерский код в каком-то месте вызывает метод yield(T), в котором аргумент (очередной посчитанный элемент) толкается в очередь, что вызывает разблокирование основного потока. Сам же рабочий поток при этом снова блокируется, ожидая команды продолжения.
4) когда выполнение body завершается, в очередь кладется сообщение о завершении, которое будет трактоваться как конец поседовательности.

Кстати, сами классы сообщений элементарны:

private open class Message {
}
private class ValueMessage(public val value: Any): Message() {
}
private class CompletedMessage: Message() {
}
private class ExceptionThrownMessage(public val exception: Throwable): Message() {
}

Все отлично, можно попробовать запустить функцию Test() из начала статьи:

assertEquals(listOf(0, 1, 2, 3), Test().toList())

Наведение красоты

Однако возможности Котлина позволяют оформить yieldfun немного красивее и интереснее.

Избавляемся от явного context

В принципе, Котлин разрешает опускать название аргумента при передаче лямбды в вызов, если аргумент один. Тогда обращаться к нему можно по имени it:

fun Test() = yieldfun<Int> {
    for (val i in 0 .. 3)
        it.yield(i);
}

Но все же можно сделать еще интереснее.
Котлин поддерживает т.н. Extension function literals — в типе нашего аргумента body мы можем указать, что body должен быть методом расширения для какого либо класса. Сделаем его методом расширения для класса YieldContext<T> и уберем его первый аргумент:

public fun yieldfun<T: Any>(body: YieldContext<T>.() -> Unit): Iterable<T> = YieldIterable<T>(body)

Аналогично сделаем такую же замену типа аргумента body во всех остальных наших классах.
Что это дает? Теперь мы можем писать такой код:

fun Test() = yieldfun<Int> {
    for (val i in 0 .. 3)
        yield(i);
}

Т.к. теперь переданная в yieldfun функция является методом расширения для класса YieldContext<T>, то мы можем в этой функции обращаться к объекту типа YieldContext<T> через this:

this.yield(i)

или, опустив this,

yield(i)

Т.к. мы изменили тип агрумента body, то его вызов внутри потока нужно производить так:

continuationSync.take()
body()
resultQueue.put(CompletedMessage())

Это сработает потому, что YieldIterator является реализацией YieldContext, для которого body — extension-метод.

Избавляемся от скобок

Круто. Но можно добавить еще один штрих — мне не нравятся скобки при вызове yield :)

Добавим в YieldContext<T> свойство ret:

public trait YieldContext<T> {
    fun yield(value: T): Unit
    val ret: YieldContext<T>
}

А в реализации в классе YieldIterator будем возвращать this:

override val ret: YieldContext<T> = this

Какой же от этого толк? Мы всего лишь сможем сделать такой вызов:

ret.yield(i)

Но Котлин поддерживает инфиксную форму вызовов для функций от двух аргументов, а так как метод yield(T) имеет еще и неявный параметр this, то предыдущий вызов можно записать так:

ret yield i

Как видим, скобки больше не нужны.

Итак, пример с числами Фибоначчи на C# можно теперь написать и на Kotlin:

fun fibonacci(): Iterable<Int> = yieldfun {
    var a = 0
    var b = 1
    while (true)
    {
        val sum = a + b;
        ret yield sum;
        a = b;
        b = sum;
    }
}

Стоить заметить, что если мы явно укажем тип возвращаемого значения функции, то специализировать generic-параметр у yieldfun уже необязательно — сработает вывод типов.

Освобождение ресурсов

Многие наверное заметили, что наш итератор не завершит рабочий поток, если были проитерированы не все элементы последовательности. Надо бы исправить это. Для этого добавим метод finalize() в класс итератора:

protected fun finalize() {
    thread?.interrupt()
}

А в коде второго потока добавим еще один блок catch:

try {
    continuationSync.take()
    body()
    resultQueue.put(CompletedMessage())
}
catch (e: InterruptedException) {
    // if not all items were iterated yield will wait for signal, so finalizer should
    // interrupt the thread to exit
}
catch (e: Throwable) {
    resultQueue.put(ExceptionThrownMessage(e))
}

Тогда при сборке мусора, на ожидании элемента из continuationSync будет выброшено исключение, которое позволит рабочему потоку завершиться.
Однако, теперь стоит помнить, что в теле функции, передаваемой в yieldfun, нельзя кидать InterruptedException, а также нельзя оборачивать вызов yield блоком try-catch, который будет глушить исключения этого типа.

Заключение

Данный пример вряд ли подойдет для написания больших библиотек, наподобие extension-методов для коллекций в .Net, т.к. использование многопоточного подхода сильно медленнее, чем «правильная» реализация этой конструкции. Но несмотря на это, надеюсь, мне удалось показать интересные возможности нового языка от Jetbrains. Также надеюсь, что у ребят из команды Kotlin хватит ресурсов и терпения допилить язык до релиза.

Автор: nerzhul

Источник

Поделиться

* - обязательные к заполнению поля