- PVSM.RU - https://www.pvsm.ru -
Модель памяти, существующая на данный момент в Java, гарантирует ожидаемый порядок выполнения многопоточного кода, при отсутствии в этом коде гонок потоков. И для того, чтобы обезопасить ваш код от гонок, придуманы различные способы синхронизации и обмена данными между ними.
Пакет java.util.concurrent
, входящий в состав HotSpot JDK, предоставляет следующие инструменты для написания многопоточного кода:
В дочернем пакете java.util.concurrent.atomic
находится набор классов для атомарной работы с примитивными типами. Контракт данных классов гарантирует выполнение операции compare-and-set
за «1 единицу процессорного времени». При установке нового значения этой переменной вы также передаете ее старое значение (подход оптимистичной блокировки). Если с момента вызова метода значение переменной отличается от ожидаемого — результатом выполнения будет false
.
Для примера возьмем два массива long
переменных [1,2,3,4,5]
и [-1,-2,-3,-4,-5]
. Каждый из потоков будет последовательно итерироваться по массиву и суммировать элементы в единую переменную. Код (groovy) с пессимистичной блокировкой выглядит так:
class Sum {
static monitor = new Object()
static volatile long sum = 0
}
class Summer implements Callable {
long[] data
Object call() throws Exception {
data.each {
synchronized (Sum.monitor) {
println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
Sum.sum += it
}
}
}
}
Executors.newFixedThreadPool(2).invokeAll([
new Summer(data: [1,2,3,4,5]),
new Summer(data: [-1,-2,-3,-4,-5])
])
print("Sum: ${Sum.sum}")
Результат выполнения будет ожидаемым:
pool-1-thread-1: add 1 to 0
pool-1-thread-2: add -1 to 1
pool-1-thread-1: add 2 to 0
pool-1-thread-2: add -2 to 2
pool-1-thread-1: add 3 to 0
pool-1-thread-2: add -3 to 3
pool-1-thread-1: add 4 to 0
pool-1-thread-1: add 5 to 4
pool-1-thread-2: add -4 to 9
pool-1-thread-2: add -5 to 5
Sum: 0
Однако такой подход имеет существенные недостатки по производительности. В данном случае на бесполезную для нас работу, уходит больше ресурсов, чем на полезную:
Рассмотрим использование AtomicLong
для реализации оптимистичной блокировки при расчете этой же суммы:
class Sum {
static volatile AtomicLong sum = new AtomicLong(0)
}
class Summer implements Callable {
long[] data
Object call() throws Exception {
data.each {
while(true) {
long localSum = Sum.sum.get()
if (Sum.sum.compareAndSet(localSum, localSum + it)) {
println("${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
break;
} else {
println("[MISS!] ${Thread.currentThread().name}: add ${it} to ${Sum.sum}")
}
}
}
}
}
Executors.newFixedThreadPool(2).invokeAll([
new Summer(data: [1,2,3,4,5]),
new Summer(data: [-1,-2,-3,-4,-5])
])
print("Sum: ${Sum.sum}")
Как видно из результатов «ошибочных» попыток было не так уж и много:
[MISS!] pool-1-thread-1: add 1 to -1
pool-1-thread-2: add -1 to -1
pool-1-thread-2: add -2 to -3
[MISS!] pool-1-thread-1: add 1 to -3
pool-1-thread-2: add -3 to -6
pool-1-thread-1: add 1 to -5
[MISS!] pool-1-thread-2: add -4 to -5
pool-1-thread-1: add 2 to -7
pool-1-thread-2: add -4 to -7
pool-1-thread-1: add 3 to -9
pool-1-thread-2: add -5 to -9
pool-1-thread-1: add 4 to -5
pool-1-thread-1: add 5 to 0
Sum: 0
При решении использовать оптимистичную блокировку важно, чтобы действие с модифицируемой переменной не занимало много времени. Чем дольше это действие — тем чаще будут случаться ошибочные compare-and-set
, и тем чаще придется выполнять это действие повторно.
На основе compare-and-set
может также реализовываться неблокирующая read блокировка. В данном случае в atomic переменной будет храниться версия обрабатываемого объекта. Получив значение версии до вычислений мы можем сверить ее после вычисления. Обычные read-write
блокировки вступают в силу, только если проверка версии провалилась.
class Transaction {
long debit
}
class Account {
AtomicLong version = new AtomicLong()
ReadWriteLock readWriteLock = new ReentrantReadWriteLock()
List<Transaction> transactions = new ArrayList<Transaction>()
}
long balance(Account account) {
ReentrantReadWriteLock.ReadLock locked
while(true) {
long balance = 0
long version = account.version.get()
account.transactions.each {balance += it.debit}
//volatile write for JMM
if (account.version.compareAndSet(version, version)) {
if (locked) {locked.unlock()}
return balance
} else {
locked = account.readWriteLock.readLock()
}
}
}
void modifyTransaction(Account account, int position, long newDebit) {
def writeLock = account.readWriteLock.writeLock()
account.version.incrementAndGet()
account.transactions[position].debit = newDebit
writeLock.unlock()
}
В отличие от syncronized блокировок, ReentrantLock
позволяет более гибко выбирать моменты снятия и получения блокировки т.к. использует обычные Java вызовы. Также ReentrantLock
позволяет получить информацию о текущем состоянии блокировки, разрешает «ожидать» блокировку в течение определенного времени. Поддерживает правильное рекурсивное получение и освобождение блокировки для одного потока. Если вам необходимы честные блокировки (соблюдающие очередность при захвате монитора) — ReentrantLock
также снабжен этим механизмом.
Несмотря на то, что syncronized
и ReentrantLock
блокировки очень похожи — реализация на уровне JVM отличается довольно сильно.
Не вдаваясь в подробности JMM: использовать ReentrantLock
вместо предоставляемой JVM syncronized блокировки стоит только в том случае, если у вас очень часто происходит битва потоков за монитор. В случае, когда в syncronized метод _обычно_ попадает лишь один поток — производительность ReentrantLock
уступает механизму блокировок JVM.
Дополняет свойства ReentrantLock
возможностью захватывать множество блокировок на чтение и блокировку на запись. Блокировка на запись может быть «опущена» до блокировки на чтение, если это необходимо.
Реализовывает оптимистичные и пессимистичные блокировки на чтение-запись с возможностью их дальнейшего увеличения или уменьшения. Оптимистичная блокировка реализуется через «штамп» лока (javadoc):
double distanceFromOriginV1() { // A read-only method
long stamp;
if ((stamp = sl.tryOptimisticRead()) != 0L) { // optimistic
double currentX = x;
double currentY = y;
if (sl.validate(stamp))
return Math.sqrt(currentX * currentX + currentY * currentY);
}
stamp = sl.readLock(); // fall back to read lock
try {
double currentX = x;
double currentY = y;
return Math.sqrt(currentX * currentX + currentY * currentY);
} finally {
sl.unlockRead(stamp);
}
}
Честная очередь для передачи сообщения из одного потока в другой. Поддерживает блокирующие (put()
take()
) и неблокирующие (offer()
pool()
) методы. Запрещает null значения. Емкость очереди должна быть указанна при создании.
Ключ-значение структура, основанная на hash
функции. Отсутствуют блокировки на чтение. При записи блокируется только часть карты (сегмент). Кол-во сегментов ограничено ближайшей к concurrencyLevel
степени 2.
Сбалансированная многопоточная ключ-значение структура (O(log n)). Поиск основан на списке с пропусками. Карта должна иметь возможность сравнивать ключи.
ConcurrentSkipListMap
без значений.
Блокирующий на запись, не блокирующий на чтение список. Любая модификация создает новый экземпляр массива в памяти.
CopyOnWriteArrayList
без значений.
PriorityBlockingQueue
разрешающая получить элемент только после определенной задержки (задержка объявляется через Delayed
интерфейс объекта). DelayQueue
может быть использована для реализации планировщика. Емкость очереди не фиксирована.
Двунаправленная BlockingQueue
, основанная на связанности (cache-miss & cache coherence overhead). Емкость очереди не фиксирована.
Однонаправленная BlockingQueue
, основанная на связанности (cache-miss & cache coherence overhead). Емкость очереди не фиксирована.
Однонаправленная `BlockingQueue`, основанная на связанности (cache-miss & cache coherence overhead). Емкость очереди не фиксирована. Данная очередь позволяет ожидать когда элемент «заберет» обработчик.
Однонаправленная `BlockingQueue`, разрешающая приоритизировать сообщения (через сравнение элементов). Запрещает null значения.
Однонаправленная `BlockingQueue`, реализующая transfer()
логику для put()
методов.
Барьер (await()
), ожидающий конкретного (или больше) кол-ва вызовов countDown()
. Состояние барьера не может быть сброшено.
Барьер (await()
), ожидающий конкретного кол-ва вызовов await()
другими потоками. Когда кол-во потоков достигнет указанного будет вызван опциональный callback и блокировка снимется. Барьер сбрасывает свое состояние в начальное при освобождении ожидающих потоков и может быть использован повторно.
Барьер (`exchange()`) для синхронизации двух потоков. В момент синхронизации возможна volatile передача объектов между потоками.
Расширение `CyclicBarrier`, позволяющая регистрировать и удалять участников на каждый цикл барьера.
Барьер, разрешающий только указанному кол-во потоков захватить монитор. По сути расширяет функционал `Lock` возможность находиться в блоке нескольким потокам.
ExecutorService
пришел на замену new Thread(runnable)
чтобы упростить работу с потоками. ExecutorService
помогает повторно использовать освободившиеся потоки, организовывать очереди из задач для пула потоков, подписываться на результат выполнения задачи. Вместо интерфейса Runnable
пул использует интерфейс Callable
(умеет возвращать результат и кидать ошибки).
ExecutorService pool = Executors.newFixedThreadPool(4)
Future future = pool.submit(new Callable() {
Object call() throws Exception {
println("In thread")
return "From thread"
}
})
println("From main")
println(future.get())
try {
pool.submit(new Callable() {
Object call() throws Exception {
throw new IllegalStateException()
}
}).get()
} catch (ExecutionException e) {println("Got it: ${e.cause}")}
pool.shutdown()
Метод invokeAll
отдает управление вызвавшему потоку только по завершению всех задач. Метод invokeAny
возвращает результат первой успешно выполненной задачи, отменяя все последующие.
Пул потоков с возможностью указывать рабочее и максимальное кол-во потоков в пуле, очередь для задач.
Расширяет функционал ThreadPoolExecutor
возможностью выполнять задачи отложенно или регулярно.
Более легкий пул потоков для «самовоспроизводящих» задач. Пул ожидает вызовов `fork()` и `join()` методов у дочерних задач в родительской.
class LNode {
List<LNode> childs = []
def object
}
class Finder extends RecursiveTask<LNode> {
LNode node
Object expect
protected LNode compute() {
if (node?.object?.equals(expect)) {
return node
}
node?.childs?.collect {
new Finder(node: it, expect: expect).fork()
}?.collect {
it.join()
}?.find {
it != null
}
}
}
ForkJoinPool es = new ForkJoinPool()
def invoke = es.invoke(new Finder(
node: new LNode(
childs: [
new LNode(object: "ivalid"),
new LNode(
object: "ivalid",
childs: [new LNode(object: "test")]
)
]
),
expect: "test"
))
print("${invoke?.object}")
Аккумуляторы позволяют выполнять примитивные операции (сумма/поиск максимального значения) над числовыми элементами в многопоточной среде без использования CAS.
Автор: fls_welvet
Источник [1]
Сайт-источник PVSM.RU: https://www.pvsm.ru
Путь до страницы источника: https://www.pvsm.ru/java/39546
Ссылки в тексте:
[1] Источник: http://habrahabr.ru/post/187854/
Нажмите здесь для печати.