JDK concurrent package

в 12:22, , рубрики: java, java concurrency

Модель памяти, существующая на данный момент в Java, гарантирует ожидаемый порядок выполнения многопоточного кода, при отсутствии в этом коде гонок потоков. И для того, чтобы обезопасить ваш код от гонок, придуманы различные способы синхронизации и обмена данными между ними.

Пакет java.util.concurrent, входящий в состав HotSpot JDK, предоставляет следующие инструменты для написания многопоточного кода:

  • Atomic
  • Locks
  • Collections
  • Synchronization points
  • Executors
  • Accumulators _jdk 1.8_

Atomic

В дочернем пакете java.util.concurrent.atomic находится набор классов для атомарной работы с примитивными типами. Контракт данных классов гарантирует выполнение операции compare-and-set за «1 единицу процессорного времени». При установке нового значения этой переменной вы также передаете ее старое значение (подход оптимистичной блокировки). Если с момента вызова метода значение переменной отличается от ожидаемого — результатом выполнения будет false.

Для примера возьмем два массива long переменных [1,2,3,4,5] и [-1,-2,-3,-4,-5]. Каждый из потоков будет последовательно итерироваться по массиву и суммировать элементы в единую переменную. Код (groovy) с пессимистичной блокировкой выглядит так:

class Sum {
    static monitor = new Object()
    static volatile long sum = 0
}

class Summer implements Callable {
    long[] data
    Object call() throws Exception {
        data.each {
            synchronized (Sum.monitor) {
                println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
                Sum.sum += it
            }
        }
    }
}

Executors.newFixedThreadPool(2).invokeAll([
        new Summer(data: [1,2,3,4,5]),
        new Summer(data: [-1,-2,-3,-4,-5])
])

print("Sum: ${Sum.sum}")

Результат выполнения будет ожидаемым:

pool-1-thread-1: add 1 to 0
pool-1-thread-2: add -1 to 1
pool-1-thread-1: add 2 to 0
pool-1-thread-2: add -2 to 2
pool-1-thread-1: add 3 to 0
pool-1-thread-2: add -3 to 3
pool-1-thread-1: add 4 to 0
pool-1-thread-1: add 5 to 4
pool-1-thread-2: add -4 to 9
pool-1-thread-2: add -5 to 5
Sum: 0

Однако такой подход имеет существенные недостатки по производительности. В данном случае на бесполезную для нас работу, уходит больше ресурсов, чем на полезную:

  • попытка блокирования монитора
  • блокировка потока
  • разблокировка монитора
  • разблокировка потока

Рассмотрим использование AtomicLong для реализации оптимистичной блокировки при расчете этой же суммы:

class Sum {
    static volatile AtomicLong sum = new AtomicLong(0)
}
class Summer implements Callable {
    long[] data
    Object call() throws Exception {
        data.each {
                while(true) {
                    long localSum = Sum.sum.get()
                    if (Sum.sum.compareAndSet(localSum, localSum + it)) {
                        println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
                        break;
                    } else {
                        println("[MISS!] ${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
                    }
                }
        }
    }
}

Executors.newFixedThreadPool(2).invokeAll([
        new Summer(data: [1,2,3,4,5]),
        new Summer(data: [-1,-2,-3,-4,-5])
])

print("Sum: ${Sum.sum}")

Как видно из результатов «ошибочных» попыток было не так уж и много:

[MISS!] pool-1-thread-1: add 1 to -1
pool-1-thread-2: add -1 to -1
pool-1-thread-2: add -2 to -3
[MISS!] pool-1-thread-1: add 1 to -3
pool-1-thread-2: add -3 to -6
pool-1-thread-1: add 1 to -5
[MISS!] pool-1-thread-2: add -4 to -5
pool-1-thread-1: add 2 to -7
pool-1-thread-2: add -4 to -7
pool-1-thread-1: add 3 to -9
pool-1-thread-2: add -5 to -9
pool-1-thread-1: add 4 to -5
pool-1-thread-1: add 5 to 0
Sum: 0

При решении использовать оптимистичную блокировку важно, чтобы действие с модифицируемой переменной не занимало много времени. Чем дольше это действие — тем чаще будут случаться ошибочные compare-and-set, и тем чаще придется выполнять это действие повторно.

На основе compare-and-set может также реализовываться неблокирующая read блокировка. В данном случае в atomic переменной будет храниться версия обрабатываемого объекта. Получив значение версии до вычислений мы можем сверить ее после вычисления. Обычные read-write блокировки вступают в силу, только если проверка версии провалилась.

class Transaction {
    long debit
}

class Account {
    AtomicLong version = new AtomicLong()
    ReadWriteLock readWriteLock = new ReentrantReadWriteLock()
    List<Transaction> transactions = new ArrayList<Transaction>()
}

long  balance(Account account) {
    ReentrantReadWriteLock.ReadLock locked
    while(true) {
        long balance = 0
        long version = account.version.get()
        account.transactions.each {balance += it.debit}
        //volatile write for JMM
        if (account.version.compareAndSet(version, version)) {
            if (locked) {locked.unlock()}
            return balance
        } else {
            locked = account.readWriteLock.readLock()
        }
    }
}

void modifyTransaction(Account account, int position, long newDebit) {
    def writeLock = account.readWriteLock.writeLock()
    account.version.incrementAndGet()
    account.transactions[position].debit = newDebit
    writeLock.unlock()
}
Locks

ReentrantLock

В отличие от syncronized блокировок, ReentrantLock позволяет более гибко выбирать моменты снятия и получения блокировки т.к. использует обычные Java вызовы. Также ReentrantLock позволяет получить информацию о текущем состоянии блокировки, разрешает «ожидать» блокировку в течение определенного времени. Поддерживает правильное рекурсивное получение и освобождение блокировки для одного потока. Если вам необходимы честные блокировки (соблюдающие очередность при захвате монитора) — ReentrantLock также снабжен этим механизмом.

Несмотря на то, что syncronized и ReentrantLock блокировки очень похожи — реализация на уровне JVM отличается довольно сильно.
Не вдаваясь в подробности JMM: использовать ReentrantLock вместо предоставляемой JVM syncronized блокировки стоит только в том случае, если у вас очень часто происходит битва потоков за монитор. В случае, когда в syncronized метод _обычно_ попадает лишь один поток — производительность ReentrantLock уступает механизму блокировок JVM.

ReentrantReadWriteLock

Дополняет свойства ReentrantLock возможностью захватывать множество блокировок на чтение и блокировку на запись. Блокировка на запись может быть «опущена» до блокировки на чтение, если это необходимо.

StampedLock _jdk 1.8_

Реализовывает оптимистичные и пессимистичные блокировки на чтение-запись с возможностью их дальнейшего увеличения или уменьшения. Оптимистичная блокировка реализуется через «штамп» лока (javadoc):

double distanceFromOriginV1() { // A read-only method
 long stamp;
 if ((stamp = sl.tryOptimisticRead()) != 0L) { // optimistic
   double currentX = x;
   double currentY = y;
   if (sl.validate(stamp))
     return Math.sqrt(currentX * currentX + currentY * currentY);
 }
 stamp = sl.readLock(); // fall back to read lock
 try {
   double currentX = x;
   double currentY = y;
     return Math.sqrt(currentX * currentX + currentY * currentY);
 } finally {
   sl.unlockRead(stamp);
 }
}
Collections

ArrayBlockingQueue

Честная очередь для передачи сообщения из одного потока в другой. Поддерживает блокирующие (put() take()) и неблокирующие (offer() pool()) методы. Запрещает null значения. Емкость очереди должна быть указанна при создании.

ConcurrentHashMap

Ключ-значение структура, основанная на hash функции. Отсутствуют блокировки на чтение. При записи блокируется только часть карты (сегмент). Кол-во сегментов ограничено ближайшей к concurrencyLevel степени 2.

ConcurrentSkipListMap

Сбалансированная многопоточная ключ-значение структура (O(log n)). Поиск основан на списке с пропусками. Карта должна иметь возможность сравнивать ключи.

ConcurrentSkipListSet

ConcurrentSkipListMap без значений.

CopyOnWriteArrayList

Блокирующий на запись, не блокирующий на чтение список. Любая модификация создает новый экземпляр массива в памяти.

CopyOnWriteArraySet

CopyOnWriteArrayList без значений.

DelayQueue

PriorityBlockingQueue разрешающая получить элемент только после определенной задержки (задержка объявляется через Delayed интерфейс объекта). DelayQueue может быть использована для реализации планировщика. Емкость очереди не фиксирована.

LinkedBlockingDeque

Двунаправленная BlockingQueue, основанная на связанности (cache-miss & cache coherence overhead). Емкость очереди не фиксирована.

LinkedBlockingQueue

Однонаправленная BlockingQueue, основанная на связанности (cache-miss & cache coherence overhead). Емкость очереди не фиксирована.

LinkedTransferQueue

Однонаправленная `BlockingQueue`, основанная на связанности (cache-miss & cache coherence overhead). Емкость очереди не фиксирована. Данная очередь позволяет ожидать когда элемент «заберет» обработчик.

PriorityBlockingQueue

Однонаправленная `BlockingQueue`, разрешающая приоритизировать сообщения (через сравнение элементов). Запрещает null значения.

SynchronousQueue

Однонаправленная `BlockingQueue`, реализующая transfer() логику для put() методов.

Synchronization points

CountDownLatch

Барьер (await()), ожидающий конкретного (или больше) кол-ва вызовов countDown(). Состояние барьера не может быть сброшено.

CyclicBarrier

Барьер (await()), ожидающий конкретного кол-ва вызовов await() другими потоками. Когда кол-во потоков достигнет указанного будет вызван опциональный callback и блокировка снимется. Барьер сбрасывает свое состояние в начальное при освобождении ожидающих потоков и может быть использован повторно.

Exchanger

Барьер (`exchange()`) для синхронизации двух потоков. В момент синхронизации возможна volatile передача объектов между потоками.

Phaser

Расширение `CyclicBarrier`, позволяющая регистрировать и удалять участников на каждый цикл барьера.

Semaphore

Барьер, разрешающий только указанному кол-во потоков захватить монитор. По сути расширяет функционал `Lock` возможность находиться в блоке нескольким потокам.

Executors

ExecutorService пришел на замену new Thread(runnable) чтобы упростить работу с потоками. ExecutorService помогает повторно использовать освободившиеся потоки, организовывать очереди из задач для пула потоков, подписываться на результат выполнения задачи. Вместо интерфейса Runnable пул использует интерфейс Callable (умеет возвращать результат и кидать ошибки).

ExecutorService pool = Executors.newFixedThreadPool(4)
Future future = pool.submit(new Callable() {
    Object call() throws Exception {
        println("In thread")
        return "From thread"
    }
})
println("From main")
println(future.get())

try {
    pool.submit(new Callable() {
        Object call() throws Exception {
            throw new IllegalStateException()
        }
    }).get()
} catch (ExecutionException e) {println("Got it: ${e.cause}")}

pool.shutdown()

Метод invokeAll отдает управление вызвавшему потоку только по завершению всех задач. Метод invokeAny возвращает результат первой успешно выполненной задачи, отменяя все последующие.

ThreadPoolExecutor

Пул потоков с возможностью указывать рабочее и максимальное кол-во потоков в пуле, очередь для задач.

ScheduledThreadPoolExecutor

Расширяет функционал ThreadPoolExecutor возможностью выполнять задачи отложенно или регулярно.

ThreadPoolExecutor

Более легкий пул потоков для «самовоспроизводящих» задач. Пул ожидает вызовов `fork()` и `join()` методов у дочерних задач в родительской.

class LNode {
    List<LNode> childs = []
    def object
}

class Finder extends RecursiveTask<LNode> {
    LNode  node
    Object expect

    protected LNode compute() {
        if (node?.object?.equals(expect)) {
            return node
        }
        node?.childs?.collect {
            new Finder(node: it, expect: expect).fork()
        }?.collect {
            it.join()
        }?.find {
            it != null
        }
    }
}

ForkJoinPool es = new ForkJoinPool()
def invoke = es.invoke(new Finder(
        node: new LNode(
                childs: [
                        new LNode(object: "ivalid"),
                        new LNode(
                                object: "ivalid",
                                childs: [new LNode(object: "test")]
                        )
                ]
        ),
        expect: "test"
))

print("${invoke?.object}")
Accumulators _jdk 1.8_

Аккумуляторы позволяют выполнять примитивные операции (сумма/поиск максимального значения) над числовыми элементами в многопоточной среде без использования CAS.

Автор: fls_welvet

Источник


* - обязательные к заполнению поля


https://ajax.googleapis.com/ajax/libs/jquery/3.4.1/jquery.min.js