Легковесные потоки в Java

в 9:01, , рубрики: concurrency, java, параллельное программирование

Многопоточная модель программирования предоставляет удобную абстракцию для разработки параллельных программ. К сожалению, большие накладные расходы потоков операционной системы на память и переключение контекстов сильно ограничивают их применение. Легковесные (прикладные, пользовательские) потоки не имеют таких проблем, так как требуют значительно меньше памяти и гораздо меньше нагружают процессор при переключении контекстов, что позволяет запустить большое количество таких потоков в приложении. Также легковесные потоки позволяют разрабатывать асинхронные приложения без использования обратных вызовов, что делает код чище и проще для понимания.

Потоки Java соответствуют потокам ядра и поэтому обладают всеми присущими им недостатками. В сети можно найти проекты, целью которых является отказ от потоков Java и реализация пользовательских потоков. Самые известные перечислены ниже.

Kilim — один из первых «рабочих» проектов, реализующих легковесные потоки. Библиотека предоставляет средства для создания приложений, основанных на обмене сообщениями. Из-за соответствующего API данную библиотеку можно рассматривать скорее как реализующую модель акторов, чем потоковую модель.

Quasar — другой проект, реализующий прикладные потоки, называемые нитями (fibers). Кроме легковесных потоков библиотека предоставляет построенную на нитях реализацию модели акторов. Хотя API нитей похож на API потоков Java, чтобы воспользоваться средствами библиотеки, потребуется переписать код приложения.

В данной статье рассматривается проект Zephyr. Его отличие от первых двух проектов заключается в том, что средства библиотеки позволяют «превратить» обычные потоки в легковесные, не изменяя кода приложения. В действительности библиотека позволяет использовать любую реализацию потоков, и легковесные потоки являются одной из возможных реализаций.

Пример

Рассмотрим классическую задачу для оценки производительности потоков. По условиям задачи создаются 503 потока, объединенные в кольцо. Некоторое сообщение передается первому потоку и далее от потока к потоку N раз. Последний поток, получивший сообщение, печатает свой номер.

Ниже приведен исходный код задачи и POM-файл с плагином для запуска.

srcmainjavaorgexamplethreadringThreadRing.java

package org.example.threadring;

import java.util.concurrent.locks.LockSupport;

public class ThreadRing {

    public static void main(String[] args) throws InterruptedException {
        int threadCount = Integer.parseInt(args[0]);
        int n = Integer.parseInt(args[1]);

        WorkerThread[] threads = new WorkerThread[threadCount];
        WorkerThread first = new WorkerThread(1);
        WorkerThread next = first;

        for (int i = threadCount - 1; i > 0; i--) {
            WorkerThread thread = new WorkerThread(i + 1);
            threads[i] = thread;
            thread.next = next;
            thread.waiting = true;
            thread.start();
            next = thread;
        }

        threads[0] = first;
        first.next = next;
        first.message = n;
        first.waiting = false;
        first.start();

        for (WorkerThread thread : threads) {
            thread.join();
        }
    }

    private static final class WorkerThread extends Thread {

        private final int id;
        WorkerThread next;
        int message;
        volatile boolean waiting;

        WorkerThread(int id) {
            this.id = id;
        }

        @Override
        public void run() {
            int m;
            do {
                while (waiting) {
                    LockSupport.park();
                }
                m = message;
                waiting = true;
                next.message = m - 1;
                next.waiting = false;
                LockSupport.unpark(next);
            } while (m > 0);
            if (m == 0) {
                System.out.println(id);
            }
        }
    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example.threadring</groupId>
    <artifactId>threadring</artifactId>
    <version>1.0-SNAPSHOT</version>

    <build>
        <plugins>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.4.0</version>
                <configuration>
                    <mainClass>org.example.threadring.ThreadRing</mainClass>
                    <arguments>
                        <argument>503</argument>
                        <argument>50000000</argument>
                    </arguments>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>java</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Собираем проект и запускаем.

mvn exec:java
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building threadring 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- exec-maven-plugin:1.4.0:java (default-cli) @ threadring ---
292
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 1:45.475s

Теперь изменим POM-файл, добавив поддержку легковесных потоков.

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example.threadring</groupId>
    <artifactId>threadring</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <zephyr.version>1.0-SNAPSHOT</zephyr.version>
    </properties>

    <build>
        <plugins>
            <plugin>
                <groupId>org.jvnet.zephyr.maven</groupId>
                <artifactId>remapping-maven-plugin</artifactId>
                <version>1.0-SNAPSHOT</version>
                <configuration>
                    <mappingEntries>
                        <mappingEntry>
                            <oldName>java/lang/Thread</oldName>
                            <newName>org/jvnet/zephyr/jcl/java/lang/Thread</newName>
                        </mappingEntry>
                        <mappingEntry>
                            <oldName>java/util/concurrent/locks/LockSupport</oldName>
                            <newName>org/jvnet/zephyr/jcl/java/util/concurrent/locks/LockSupport</newName>
                        </mappingEntry>
                    </mappingEntries>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>remapping</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.jvnet.zephyr.maven</groupId>
                <artifactId>javaflow-maven-plugin</artifactId>
                <version>1.0-SNAPSHOT</version>
                <dependencies>
                    <dependency>
                        <groupId>org.jvnet.zephyr.thread</groupId>
                        <artifactId>thread-api</artifactId>
                        <version>${zephyr.version}</version>
                    </dependency>
                    <dependency>
                        <groupId>org.jvnet.zephyr.jcl</groupId>
                        <artifactId>jcl-jdk7</artifactId>
                        <version>${zephyr.version}</version>
                    </dependency>
                </dependencies>
                <configuration>
                    <classesDirectory>${project.build.directory}/remapping-classes</classesDirectory>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>javaflow</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.5</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>jar</goal>
                        </goals>
                        <configuration>
                            <classifier>javaflow</classifier>
                            <classesDirectory>${project.build.directory}/javaflow-classes</classesDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>1.4.0</version>
                <dependencies>
                    <dependency>
                        <groupId>org.example.threadring</groupId>
                        <artifactId>threadring</artifactId>
                        <version>${project.version}</version>
                        <classifier>javaflow</classifier>
                    </dependency>
                    <dependency>
                        <groupId>org.jvnet.zephyr.continuation</groupId>
                        <artifactId>continuation-javaflow</artifactId>
                        <version>${zephyr.version}</version>
                    </dependency>
                    <dependency>
                        <groupId>org.jvnet.zephyr.thread</groupId>
                        <artifactId>thread-api</artifactId>
                        <version>${zephyr.version}</version>
                        <classifier>javaflow</classifier>
                    </dependency>
                    <dependency>
                        <groupId>org.jvnet.zephyr.thread</groupId>
                        <artifactId>thread-continuation</artifactId>
                        <version>${zephyr.version}</version>
                        <classifier>javaflow</classifier>
                    </dependency>
                    <dependency>
                        <groupId>org.jvnet.zephyr.jcl</groupId>
                        <artifactId>jcl-jdk7</artifactId>
                        <version>${zephyr.version}</version>
                        <classifier>javaflow</classifier>
                    </dependency>
                </dependencies>
                <configuration>
                    <cleanupDaemonThreads>false</cleanupDaemonThreads>
                    <includeProjectDependencies>false</includeProjectDependencies>
                    <includePluginDependencies>true</includePluginDependencies>
                    <mainClass>org.example.threadring.ThreadRing</mainClass>
                    <arguments>
                        <argument>503</argument>
                        <argument>50000000</argument>
                    </arguments>
                    <systemProperties>
                        <systemProperty>
                            <key>org.jvnet.zephyr.thread.continuation.DefaultForkJoinPoolExecutor.parallelism</key>
                            <value>4</value>
                        </systemProperty>
                    </systemProperties>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>java</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Плагин remapping-maven-plugin обрабатывает class-файлы, переназначая типы. В данном примере все ссылки на Thread и LockSupport заменяются на классы, реализующие поддержку легковесных потоков. Поскольку новые классы имеют такой же интерфейс как и оригинальные, менять код приложение не требуется. По умолчанию преобразованные файлы копируются в директорию targetremapping-classes.

Плагин javaflow-maven-plugin трансформирует class-файлы, добавляя в методы поддержку продолжений (continuations), реализуемых библиотекой Commons Javaflow. Есть возможность исключить из обработки как целые классы (excludes), так и отдельные методы (excludedMethods). Файлы, обработанные плагином remapping-maven-plugin, являются исходными для javaflow-maven-plugin. Так как remapping-maven-plugin и javaflow-maven-plugin по умолчанию имеют один и тот же жизненный цикл (process-classes), то плагины запускаются в том порядке, в котором они указаны в POM-файле. В связи с тем что javaflow-maven-plugin в процессе работы загружает классы, на которые ссылаются трансформируемые class-файлы, необходимо указать соответствующие зависимости плагина. По умолчанию файлы после трансформации помещаются в директорию targetjavaflow-classes.

Плагин maven-jar-plugin собирает jar-файл, использую файлы, полученные после обработки плагином javaflow-maven-plugin, и помечает его классификатором javaflow.

Новая версия приложения зависит от классов, реализующих поддержку легковесных потоков, но поскольку данные классы не нужны на стадии компиляции, соответствующие зависимости добавлены в плагин exec-maven-plugin.

Собираем и запускаем.

mvn exec:java
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building threadring 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- exec-maven-plugin:1.4.0:java (default-cli) @ threadring ---
292
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 24.061s

Как видим, новая версия приложения работает значительно быстрее оригинальной.

Реализация

Суть метода, который позволяет выполнить переход от обычных потоков к легковесным не меняя исходного кода приложения, заключается в переназначении на уровне байт кода стандартных классов на классы с таким же интерфейсом, но с другой реализацией. В примере, приведенном выше, переназначаются два класса: Thread и LockSupport. Новые классы делегируют вызовы своих методов некоторой реализации ThreadImpl. При конструировании экземпляра класса потока создание конкретной реализации запрашивается у провайдера ThreadImplProvider. Класс ThreadImplProvider абстрактный, и конкретный провайдер загружается через средство загрузки сервисов ServiceLoader. Все это позволяет присоединять к потокам произвольную реализацию, осуществляя идею подключаемых потоков.

На текущий момент есть две реализации потоков: JavaThreadImpl и ContinuationThreadImpl. Класс JavaThreadImpl делегирует вызовы своих методов обратно методам класса java.lang.Thread. Поток с этой реализацией доступен по умолчанию при вызове Thread.currentThread. Это необходимо, для того чтобы классы, которые зависят от новых Thread и LockSupport, могли работать вне подключаемых потоков (например, в методе main). Класс ContinuationThreadImpl реализует легковесные потоки через продолжения, и в качестве планировщика по умолчанию использует ForkJoinPool. Абстрактный класс Continuation, конкретная реализация которого создается при помощи провайдера ContinuationProvider, который в свою очередь загружается через ServiceLoader, содержит методы приостановки выполнения (suspend) и возобновления выполнения продолжения (resume). Класс ContinuationThreadImpl возобновляет работу легковесного потока, отправляя на исполнение в пул потоков задачу, в которой возобновляется выполнение приостановленного продолжения. Это происходит при вызове методов unpark и yield и после завершения ожидания, которое инициируют методы parkNanos, parkUntil, sleep и join. При вызове методов park, sleep и join продолжение приостанавливается, задача завершается, и поток переводится в состояние ожидания.

Пока доступна только одна реализация продолжении, основанная на слегка измененной версии библиотеки Commons Javaflow. Javaflow позволяет включить поддержку продолжений во все методы, кроме статических конструкторов и конструкторов экземпляра, также не поддерживаются встроенные блокировки. Это означает, что приложение будет работать некорректно в том случае, когда приостанавливается метод, вызванный из конструктора или блока синхронизации. В проекте предусмотрена возможность обойти данное ограничение. С помощью методов managed и manage класса ThreadUtils можно сделать методы, вызываемые из определенного места в коде, неприостанавливаемым. Другими словами, эти методы начинают работать так, как если бы они выполнялись в обычном потоке. При таком подходе приложение будет работать корректно, хотя могут возникнуть нежелательные блокировки потоков.

Помимо классов Thread и LockSupport поддержка подключаемых (и соответственно легковесных) потоков реализована в ThreadLocal, во многих классах из пакетов java.util.concurrent и java.util.concurrent.lock (параллельные коллекции, блокировки, барьеры, семафоры и др.). Классы ограниченных (bounded) параллельных коллекций с поддержкой подключаемых потоков в сочетании с легковесными потоками позволяют создавать асинхронные приложения, в которых нет такой проблемы, когда несколько поставщиков (producers), передающих данные через ограниченный буфер, и медленных потребителей (consumers), исполняющихся на пуле потоков, замедляют или блокируют работу всей системы. Также реализован сервис-провайдер SelectorProvider каналов NIO с поддержкой подключаемых потоков. Синхронный API каналов в контексте легковесных потоков позволяет писать понятный код без селекторов и обратных вызовов.

Тест производительности

Ниже приведены результаты теста thread ring для сравнения производительности потоков Java, акторов Akka, нитей Quasar и легковесных потоков Zephyr. Для тестирования потоков Zephyr используется ForkJoinPool из Quasar, так как он отличается от стандартного ForkJoinPool, который по умолчанию используется в Zephyr.

Командная строка для запуска теста:

java -jar zephyr-benchmark.jar -jvmArgsAppend "-DworkerCount=503 -DringSize=1000000 -javaagent:quasar-core-0.6.2.jar -Dorg.jvnet.zephyr.thread.continuation.ContinuationThreadImplProvider.executor=org.jvnet.zephyr.benchmark.Jsr166ForkJoinPoolExecutor -Dakka.actor.default-dispatcher.fork-join-executor.parallelism=4 -Dco.paralleluniverse.fibers.DefaultFiberPool.parallelism=4 -Dorg.jvnet.zephyr.benchmark.Jsr166ForkJoinPoolExecutor.parallelism=4" -wi 5 -i 10 -bm avgt -tu ms -f 5 ".*RingBenchmark.*"

Результаты теста для N = 1000000 и parallelism = 4:

Benchmark                                      Mode  Samples     Score   Error  Units
o.j.z.b.AkkaActorRingBenchmark.benchmark       avgt       50   261.483 ± 4.116  ms/op
o.j.z.b.JavaThreadRingBenchmark.benchmark      avgt       50  2123.138 ± 7.711  ms/op
o.j.z.b.QuasarFiberRingBenchmark.benchmark     avgt       50   357.528 ± 7.201  ms/op
o.j.z.b.ZephyrThreadRingBenchmark.benchmark    avgt       50   258.556 ± 3.687  ms/op

Исходный код теста доступен по ссылке.

Автор: yngui

Источник

Поделиться новостью

* - обязательные к заполнению поля