Введение в Акторы на основе Java/GPars, Часть I

в 23:31, , рубрики: Алгоритмы, Блог компании GolovachCourses

Кратко рассматривается API библиотеки GPars и решение многопоточной задачи средней сложности, результаты которой могут быть полезны в «народном хозяйстве».

Данная статья написана в ходе исследования различных библиотек акторов, доступных Java-программисту, в процессе подготовки к чтению курса «Multicore programming in Java».

Это первая статья из цикла статей цель которых сравнить API, быстродействие и реализацию акторов Akka с реализациями в других библиотеках на некоторой модельной задаче. Данная статья предлагает такую задачу и решение на GPars.

GPars — библиотека написанная для Clojure с широкой поддержкой различных подходов к параллельным вычислениям.
Плюсы GPars

  • Исходный код написан на Java (в отличии от Akka, написанной на Scala). Всегда интересно посмотреть «что под капотом» на «родном» языке программирования
  • GPars представляет собой целый «зоопарк» подходов (Actor, Agent, STM, CSP, Dataflow)
  • GPars использует классы из runtime-библиотеки Clojure, написанной на Java. Интересно покопаться

«Установка» GPars

Подключаете в Maven GPars и Groovy

<dependency>
    <groupId>org.codehaus.gpars</groupId>
    <artifactId>gpars</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.codehaus.groovy</groupId>
    <artifactId>groovy-all</artifactId>
    <version>2.2.2</version>
</dependency>

Без Maven просто качайте из репозитория GPars-1.1.0 (sources) и Groovy-2.2.2 (sources) и подключайте к проекту.

Stateless Actor

Начнем с простой демонстрации (исключение в конце — так все и задумано)

import groovyx.gpars.MessagingRunnable;
import groovyx.gpars.actor.*;

public class StatelessActorTest {
    public static void main(String[] args) throws Exception {
        Actor actor = new MyStatelessActor().start();
        // могу послать сообщение и пойти дальше
        actor.send("Hello");
        // могу послать и ждать "ответа"
        System.err.println("main(...): " + actor.sendAndWait(42));
        // могу послать сообщение, повесить обработчик и пойти дальше
        actor.sendAndContinue(10.0, new MessagingRunnable<Object>() {
            protected void doRun(final Object response) {
                System.err.println("main(...): " + response);
            }
        });
        // ждем
        System.in.read();
    }

    private static class MyStatelessActor extends DynamicDispatchActor {
        public void onMessage(final String msg) {
            System.err.println("onMessage(String): " + msg);
            // могу НЕ отвечать sender-у
        }
        public void onMessage(final Integer msg) {
            System.err.println("onMessage(Integer): " + msg);
            // могу отвечать sender-у
            reply(1000 + msg);
        }
    }
}
>> onMessage(String): Hello
>> onMessage(Integer): 42
>> main(...): 1042
>> An exception occurred in the Actor thread Actor Thread 1
>> groovy.lang.MissingMethodException: No signature of method:
>> StatelessActorTest$MyStatelessActor.onMessage() is applicable for argument types: (java.lang.Double) values: [10.0]
>> Possible solutions: onMessage(java.lang.Integer), onMessage(java.lang.String)
>>         at org.codehaus.groovy.runtime.ScriptBytecodeAdapter.unwrap(...
>>         ...
>>         ...

Что видно
— «pattern matching» делает подбором подходящего перегруженного (overloaded) варианта метода onMessage(<one-arg>), если такового нет, то «получаем» исключение
— акторы работают на основе пула потоков-«демонов», так что нам необходимо как-то подвесить работу метода main() (я использовал System.in.read()) с целью предотвратить преждевременное завершение работы JVM
— на примере метода reply() мы видим, что при наследовании от DynamicDispatchActor в «пространство имен» актора попадает множество методов (reply, replyIfExists, getSender, terminate, ...)

Хотя авторы GPars и называют наследников класса DynamicDispatchActor — акторами-без-состояния (stateless actor), это — обычные экземпляры java-классов, которые могут иметь мутирующие поля и хранить в них свое состояние. Продемонстрируем это

import groovyx.gpars.actor.*;

import java.util.ArrayList;
import java.util.List;

public class StatelessActorTest {
    public static void main(String[] args) throws InterruptedException {
        Actor actor = new DynamicDispatchActor() {
            private final List<Double> state = new ArrayList<>();
            public void onMessage(final Double msg) {
                state.add(msg);
                reply(state);
            }
        }.start();

        System.out.println("answer: " + actor.sendAndWait(1.0));
        System.out.println("answer: " + actor.sendAndWait(2.0));
        System.out.println("answer: " + actor.sendAndWait(3.0));
        System.out.println("answer: " + actor.sendAndWait(4.0));
        System.out.println("answer: " + actor.sendAndWait(5.0));
    }
}
>> answer: [1.0]
>> answer: [1.0, 2.0]
>> answer: [1.0, 2.0, 3.0]
>> answer: [1.0, 2.0, 3.0, 4.0]
>> answer: [1.0, 2.0, 3.0, 4.0, 5.0]

Statefull Actor

Вводя деление stateless/statefull, авторы имею в виду, что Statefull Actor позволяют органично создавать реализации шаблона State. Рассмотрим простой пример (наследники DefaultActor — Statefull Actor-ы)

import groovyx.gpars.MessagingRunnable;
import groovyx.gpars.actor.*;

import static java.util.Arrays.asList;

public class StatefulActorTest {
    public static void main(String[] args) throws Exception {
        Actor actor = new MyStatefulActor().start();

        actor.send("A");
        actor.send(1.0);
        actor.send(Arrays.asList(1, 2, 3));

        actor.send("B");
        actor.send(2.0);
        actor.send(Arrays.asList(4, 5, 6));

        System.in.read();
    }

    private static class MyStatefulActor extends DefaultActor {
        protected void act() {
            loop(new Runnable() {
                public void run() {
                    react(new MessagingRunnable<Object>(this) {
                        protected void doRun(final Object msg) {
                            System.out.println("react: " + msg);
                        }
                    });
                }
            });
        }
    }
}
>> react: A
>> react: 1.0
>> react: [1, 2, 3]
>> react: B
>> react: 2.0
>> react: [4, 5, 6]

Однако, обещанной реализацией шаблона State совсем «не пахнет». Давайте зайдем с такой стороны (Java не лучший язык для таких трюков, на Clojure/Scala этот код выглядит намного компактнее)

import groovyx.gpars.MessagingRunnable;
import groovyx.gpars.actor.*;

import java.util.List;
import static java.util.Arrays.asList;

public class StatefulActorTest {
    public static void main(String[] args) throws Exception {
        Actor actor = new MyStatefulActor().start();

        actor.send("A");
        actor.send(1.0);
        actor.send(asList(1, 2, 3));

        actor.send("B");
        actor.send(2.0);
        actor.send(asList(4, 5, 6));

        System.in.read();
    }

    private static class MyStatefulActor extends DefaultActor {
      protected void act() {
        loop(new Runnable() {
          public void run() {
            react(new MessagingRunnable<String>(this) {
              protected void doRun(final String msg) {
                System.out.println("Stage #0: " + msg);
                react(new MessagingRunnable<Double>() {
                  protected void doRun(final Double msg) {
                    System.out.println("  Stage #1: " + msg);
                    react(new MessagingRunnable<List<Integer>>() {
                      protected void doRun(final List<Integer> msg) {
                        System.out.println("    Stage #2: " + msg + "n");
                      }
                  });
                }
              });
            }
          });
        }
      });
    }
  }
}
>> Stage #0: A
>>   Stage #1: 1.0
>>     Stage #2: [1, 2, 3]
>> 
>> Stage #0: B
>>   Stage #1: 2.0
>>     Stage #2: [4, 5, 6]

Ну или давайте избавимся от этой жудкой вложенности анонимных классов и «материализуем состояния»

import groovyx.gpars.MessagingRunnable;
import groovyx.gpars.actor.*;

import java.util.List;
import static java.util.Arrays.asList;

public class StatefulActorTest {
    public static void main(String[] args) throws Exception {
        Actor actor = new MyStatefulActor().start();

        actor.send("A");
        actor.send(1.0);
        actor.send(asList(1, 2, 3));

        actor.send("B");
        actor.send(2.0);
        actor.send(asList(4, 5, 6));

        System.in.read();
    }

    private static class MyStatefulActor extends DefaultActor {
        protected void act() {
            loop(new Runnable() {
                public void run() {
                    react(new Stage0(MyStatefulActor.this));
                }
            });
        }
    }

    private static class Stage0 extends MessagingRunnable<String> {
        private final DefaultActor owner;
        private Stage0(DefaultActor owner) {this.owner = owner;}

        protected void doRun(final String msg) {
            System.out.println("Stage #0: " + msg);
            owner.react(new Stage1(owner));
        }
    }

    private static class Stage1 extends MessagingRunnable<Double> {
        private final DefaultActor owner;
        private Stage1(DefaultActor owner) {this.owner = owner;}

        protected void doRun(final Double msg) {
            System.out.println("  Stage #1: " + msg);
            owner.react(new Stage2());
        }
    }

    private static class Stage2 extends MessagingRunnable<List<Integer>> {
        protected void doRun(final List<Integer> msg) {
            System.out.println("    Stage #2: " + msg + "n");
        }
    }
}

Да, да, я с Вами полностью согласен, Java — крайне многословный язык.

Вот как выглядит диаграмма переходов (развилок по аргументу мы не делали)

// START
// -----  
//   |
//   |
//   |
//   |  +--------+
//   +->| Stage0 | ---String----+
//      +--------+              |
//         ^                    v
//         |                +--------+
//         |                | Stage1 |
//   List<Integer>          +--------+
//         |                    |
//         |  +--------+      Double
//         +--| Stage2 |<-------+
//            +--------+

Таймер

Для решения моей задачи мне будет необходим таймер — нечто, что можно запрограммировать оповестить меня об окончании некоторого промежутка времени. В «обычной» Java мы используем java.util.concurrent.ScheduledThreadPoolExecutor или java.util.Timer на худой конец. Но мы же в мире акторов!
Это Statefull Actor, который висит в ожидании сообщения в методе react() с таймаутом. Если никакое сообщение не приходит в течении этого промежутка времени, то инфраструктура GPars присылает нам сообщение Actor.TIMEOUT (это просто строка «TIMEOUT») и мы «возвращаем» нашему создателю сообщение из конструктора timeoutMsg. Если же вы хотите «выключить» таймер — пришлите ему любое другое сообщение (я буду присылать ему строку «KILL»)

import groovyx.gpars.MessagingRunnable;
import groovyx.gpars.actor.*;
import groovyx.gpars.actor.impl.MessageStream;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

public class Timer<T> extends DefaultActor {
    private final long timeout;
    private final T timeoutMsg;
    private final MessageStream replyTo;

    public Timer(long timeout, T timeoutMsg, MessageStream replyTo) {
        this.timeout = timeout;
        this.timeoutMsg = timeoutMsg;
        this.replyTo = replyTo;
    }

    protected void act() {
        loop(new Runnable() {
            public void run() {
                react(timeout, MILLISECONDS, new MessagingRunnable() {
                    protected void doRun(Object argument) {
                        if (Actor.TIMEOUT.equals(argument)) {
                            replyTo.send(timeoutMsg);
                        }
                        terminate();
                    }
                });
            }
        });
    }
}

Пример использования таймера.
Я создаю два таймера timerX и timerY, которые с задержкой 1000мс вышлют мне сообщения «X» и «Y» соответственно. Но через 500мс я передумал и «прибил» timerX.

import groovyx.gpars.actor.Actor;
import groovyx.gpars.actor.impl.MessageStream;

public class TimerDemo {
    public static void main(String[] args) throws Exception {
        Actor timerX = new Timer<>(1000, "X", new MessageStream() {
            public MessageStream send(Object msg) {
                System.out.println("timerX send timeout message: '" + msg + "'");
                return this;
            }
        }).start();
        Actor timerY = new Timer<>(1000, "Y", new MessageStream() {
            public MessageStream send(Object msg) {
                System.out.println("timerY send timeout message: '" + msg + "'");
                return this;
            }
        }).start();
        
        Thread.sleep(500);
        timerX.send("KILL");
        
        System.in.read();
    }
}
>> timerY send timeout message: 'Y'

Постановка задачи и схема решения

Рассмотрим следующую весьма общую задачу.
1. У нас есть много потоков, которые достаточно часто вызывают некоторую функцию.
2. У этой функции есть два варианта: обработка одного аргумента и обработка списка аргументов.
3. Эта функция такова, что обработка списка аргументов потребляет меньше ресурсов системы, чем сумма обработок каждого в отдельности.
4. Задача состоит в том, что бы между потоками и функцией поместить некоторый Batcher, который собирает аргументы от потоков в «пачку», передает функции, она обрабатывает список, Batcher «раздает» результаты потокам отправителям.
5. Batcher передает список аргументов в двух случаях: собрали «пачку» достаточного размера или по истечению времени ожидания, в течении которого не удалось собрать полную «пачку», но потокам уже пора возвращать результаты.

Давайте рассмотрим схему решения.
Таймаут 100мс, максимальный размер «пачки» — 3 аргумента

В момент времени 0 поток T-0 посылает аргумент «A». Batcher находится в «чистом» состоянии, поколение 0

//time:0
//
//  T-0 --"A"----->     +-------+ generationId=0
//  T-1                 |Batcher| argList=[]
//  T-2                 +-------+ replyToList=[]

Спустя мгновение Batcher знает, что надо обсчитать «A» и вернуть потоку T-0. Заведен таймер для поколения 0

//                                    +-----+ timeoutMsg=0
//                                    |Timer| timeout=100
//time:0.001                          +-----+
//
//  T-0                 +-------+ generationId=0
//  T-1                 |Batcher| argList=["A"]
//  T-2                 +-------+ replyToList=[T-0]

В момент времени 25 миллисекунд поток T-1 посылает на обработку «B»

//                                    +-----+ timeoutMsg=0
//                                    |Timer| timeout=100
//time:25                             +-----+
//
//  T-0                 +-------+ generationId=0
//  T-1 ---"B"---->     |Batcher| argList=["A"]
//  T-2                 +-------+ replyToList=[T-0]

Спустя мгновение Batcher знает, что надо обсчитать «A» и «B» и вернуть потокам T-0 и T-1

//                                    +-----+ timeoutMsg=0
//                                    |Timer| timeout=100
//time:25.001                         +-----+
//
//  T-0                 +-------+ generationId=0
//  T-1                 |Batcher| argList=["A","B"]
//  T-2                 +-------+ replyToList=[T-0,T-1]

В момент времени 50 миллисекунд поток T-2 посылает на обработку «С»

//                                    +-----+ timeoutMsg=0
//                                    |Timer| timeout=100
//time:50                             +-----+
//
//  T-0                 +-------+ generationId=0
//  T-1                 |Batcher| argList=["A","B"]
//  T-2 ----"C"--->     +-------+ replyToList=[T-0,T-1]

Спустя мгновение Batcher знает, что надо обсчитать «A», «B» и «C» и вернуть потокам T-0, T-1 и T-2. Выясняет, что «пачка» наполнена и «убивает» таймер

//                                    +-----+ timeoutMsg=0
//                          +-"KILL"->|Timer| timeout=100
//time:50.001               |         +-----+
//                          |
//  T-0                 +-------+ generationId=0
//  T-1                 |Batcher| argList=["A","B","C"]
//  T-2                 +-------+ replyToList=[T-0,T-1,T-2]

Спустя мгновение Batcher отдает данные на обсчет в отдельному актору (anonimous), очищает состояние и меняет поколение с 0 на 1

//time:50.002
//
//  T-0                 +-------+ generationId=1
//  T-1                 |Batcher| argList=[]
//  T-2                 +-------+ replyToList=[]
//
//                                 +---------+ argList=["A","B","C"]
//                                 |anonymous| replyToList=[T-0,T-1,T-2]
//                                 +---------+

Спустя мгновение (для «раскадровки» буду считать, что вычисления мгновенны) анонимный актор выполняет действие над списком аргументов [«A»,«B»,«C»] -> [«res#A»,«res#B»,«res#C»]

//time:50.003
//
//  T-0                 +-------+ generationId=1
//  T-1                 |Batcher| argList=[]
//  T-2                 +-------+ replyToList=[]
//
//                                 +---------+ resultList=["res#A","res#B","res#B"]
//                                 |anonymous| replyToList=[T-0,T-1,T-2]
//                                 +---------+

Спустя мгновение анонимный актер раздает результаты вычисления потокам

//time:50.004
//
//  T-0 <-----------+   +-------+ generationId=1
//  T-1 <---------+ |   |Batcher| argList=[]
//  T-2 <-------+ | |   +-------+ replyToList=[]
//              | | |
//              | | +---"res#A"--- +---------+
//              | +---"res#B"----- |anonymous|
//              +--"res#C"-------- +---------+

Спустя мгновение система возвращает в исходное «чистое» состояние

//time:50.005
//
//  T-0                 +-------+ generationId=1
//  T-1                 |Batcher| argList=[]
//  T-2                 +-------+ replyToList=[]

Позже, в момент времени, 75 поток T-2 передает на вычисление «D»

//time:75
//
//  T-0                 +-------+ generationId=1
//  T-1                 |Batcher| argList=[]
//  T-2 ----"D"--->     +-------+ replyToList=[]

Спустя мгновение Batcher знает, что надо обсчитать «D» и вернуть потоку T-2, кроме того запущен таймер для поколения 1

//                                    +-----+ timeoutMsg=1
//                                    |Timer| timeout=100
//time:75.001                         +-----+
//
//  T-0                 +-------+ generationId=1
//  T-1                 |Batcher| argList=["D"]
//  T-2                 +-------+ replyToList=[T-2]

Спустя 100мс (в момент времени 175мс) инфраструктура GPars оповещает таймер о истечении периода ожидания

//                                        +--"TIMEOUT"--
//                                        |
//                                        v
//                                    +-----+ timeoutMsg=1
//                                    |Timer| timeout=100
//time:175                            +-----+
//
//  T-0                 +-------+ generationId=1
//  T-1                 |Batcher| argList=["D"]
//  T-2                 +-------+ replyToList=[T-2]

Спустя мгновение таймер оповещает Batcher о том, что время ожидания поколения 1 истекло и кончает жизнь самоубийством вызывая terminate()

//                                     +-----+ timeoutMsg=1
//                          +----1-----|Timer| timeout=100
//time:175.001              |          +-----+
//                          v
//  T-0                 +-------+ generationId=1
//  T-1                 |Batcher| argList=["D"]
//  T-2                 +-------+ replyToList=[T-2]

Создается анонимный актор, который выполняет вычисления над списком аргументов (в котором всего 1 аргумент). Поколение с 1 меняется на 2

//time:175.002
//
//  T-0                 +-------+ generationId=2
//  T-1                 |Batcher| argList=[]
//  T-2                 +-------+ replyToList=[]
//
//                                 +---------+ argList=["D"]
//                                 |anonymous| replyToList=[T-2]
//                                 +---------+

Актор выполнил работу

//time:175.003
//
//  T-0                 +-------+ generationId=2
//  T-1                 |Batcher| argList=[]
//  T-2                 +-------+ replyToList=[]
//
//                                 +---------+ resultList=["res#D"]
//                                 |anonymous| replyToList=[T-2]
//                                 +---------+

Актор отдает результат

//time:175.004
//
//  T-0                 +-------+ generationId=2
//  T-1                 |Batcher| argList=[]
//  T-2 <-------+       +-------+ replyToList=[]
//              |
//              |                  +---------+
//              +--"res#D"----- |anonymous|
//                                 +---------+

Система в исходном «чистом» состоянии

//time:175.005
//
//  T-0                 +-------+ generationId=2
//  T-1                 |Batcher| argList=[]
//  T-2                 +-------+ replyToList=[]

Решение задачи

BatchProcessor — интерфейс «функции». допускающей «пакетный режим» обработки

import java.util.List;

public interface BatchProcessor<ARG, RES> {
    List<RES> onBatch(List<ARG> argList) throws Exception;
}

Batcher — класс, «пакующий» аргументы. Ядро решения

import groovyx.gpars.actor.*;
import groovyx.gpars.actor.impl.MessageStream;

import java.util.*;

public class Batcher<ARG, RES> extends DynamicDispatchActor {
    // fixed parameters
    private final BatchProcessor<ARG, RES> processor;
    private final int maxBatchSize;
    private final long batchWaitTimeout;
    // current state
    private final List<ARG> argList = new ArrayList<>();
    private final List<MessageStream> replyToList = new ArrayList<>();
    private long generationId = 0;
    private Actor lastTimer;

    public Batcher(BatchProcessor<ARG, RES> processor, int maxBatchSize, long batchWaitTimeout) {
        this.processor = processor;
        this.maxBatchSize = maxBatchSize;
        this.batchWaitTimeout = batchWaitTimeout;
    }

    public void onMessage(final ARG elem) {
        argList.add(elem);
        replyToList.add(getSender());
        if (argList.size() == 1) {
            lastTimer = new Timer<>(batchWaitTimeout, ++generationId, this).start();
        } else if (argList.size() == maxBatchSize) {
            lastTimer.send("KILL");
            lastTimer = null;
            nextGeneration();
        }
    }

    public void onMessage(final long timeOutId) {
        if (generationId == timeOutId) {nextGeneration();}
    }

    private void nextGeneration() {
        new DynamicDispatchActor() {
            public void onMessage(final Work<ARG, RES> work) throws Exception {
                List<RES> resultList = work.batcher.onBatch(work.argList);
                for (int k = 0; k < resultList.size(); k++) {
                    work.replyToList.get(k).send(resultList.get(k));
                }
                terminate();
            }
        }.start().send(new Work<>(processor, new ArrayList<>(argList), new ArrayList<>(replyToList)));
        argList.clear();
        replyToList.clear();
        generationId = generationId + 1;
    }

    private static class Work<ARG, RES> {
        public final BatchProcessor<ARG, RES> batcher;
        public final List<ARG> argList;
        public final List<MessageStream> replyToList;

        public Work(BatchProcessor<ARG, RES> batcher, List<ARG> argList, List<MessageStream> replyToList) {
            this.batcher = batcher;
            this.argList = argList;
            this.replyToList = replyToList;
        }
    }
}

BatcherDemo — демонстрация работы класса Batcher. Совпадает со схематичным планом

import groovyx.gpars.actor.Actor;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import static java.util.concurrent.Executors.newCachedThreadPool;

public class BatcherDemo {
    public static final int BATCH_SIZE = 3;
    public static final long BATCH_TIMEOUT = 100;
    
    public static void main(String[] args) throws InterruptedException, IOException {
        final Actor actor = new Batcher<>(new BatchProcessor<String, String>() {
            public List<String> onBatch(List<String> argList) {
                System.out.println("onBatch(" + argList + ")");
                ArrayList<String> result = new ArrayList<>(argList.size());
                for (String arg : argList) {
                    result.add("res#" + arg);
                }
                return result;
            }
        }, BATCH_SIZE, BATCH_TIMEOUT).start();

        ExecutorService exec = newCachedThreadPool();
        exec.submit(new Callable<Void>() { // T-0
            public Void call() throws Exception {
                System.out.println(actor.sendAndWait(("A")));
                return null;
            }
        });
        exec.submit(new Callable<Void>() { // T-1
            public Void call() throws Exception {
                Thread.sleep(25);
                System.out.println(actor.sendAndWait(("B")));
                return null;
            }
        });
        exec.submit(new Callable<Void>() { // T-2
            public Void call() throws Exception {
                Thread.sleep(50);
                System.out.println(actor.sendAndWait(("C")));
                Thread.sleep(25);
                System.out.println(actor.sendAndWait(("D")));
                return null;
            }
        });        

        exec.shutdown();
    }
}
>> onBatch([A, B, C])
>> res#A
>> res#B
>> res#C
>> onBatch([D])
>> res#D

Заключение

В моем представлении, акторы хороши для программирования многопоточных примитивов, представляющих собой конечные автоматы со сложной диаграммой переходов, которая кроме всего прочего может зависеть от поступающих аргументов.

Некоторые примеры этой статьи являются вариациями кода найденного в сети в различных места, включая gpars.org/guide.

Во второй части мы

  • Измерим скорость работы предложенного решения
  • Ускорим работу с JDBC объединяя запросы различных потоков из отдельных транзакций в одну большую транзакцию RDBMS. То есть сделаем batch не в рамках одного Connection, а между различными Connection-ами.

Автор: IvanGolovach

Источник

Поделиться

* - обязательные к заполнению поля