- PVSM.RU - https://www.pvsm.ru -

Распределенные структуры данных [часть 1, обзорная]

Давным давно, когда деревья компьютеры были большими, а процессоры одноядерными, все приложения запускались в один поток и не испытывали сложностей синхронизации.

Современные же приложения стремятся использовать все имеющиеся ресурсы, в частности, все доступные CPU.

К сожалению, использовать стандартные структуры данных при многопоточной обработке не представляется возможным, поэтому в Java 5 появились потокобезопасные структуры данных,
т.е. функционирующие исправно, при использовании из нескольких потоков одновременно, и расположились они в пакете java.util.concurrent.

Про Vector...

На самом деле, потокобезопасные, но неэффективные, структуры данных, как, например, Vector и Hashtable, появились еще в Java 1.0.
В настоящий момент, они не рекомендуются к использованию.

Однако, не взирая на всю технологическую мощь, заложенную в пакет java.util.concurrent, обработка информации потокобезопасными коллекциями возможна лишь в рамках одного компьютера, а это порождает проблему масштабируемости.

Распределенные структуры данных [часть 1, обзорная] - 1

А что если нужно, в реальном времени, обрабатывать информацию о 100 миллионах клиентов,
когда датасет занимает 100Тб, а каждую секунду нужно совершить 100+ тысяч операций?
Вряд ли это возможно, даже на самом крутом современном железе, а если и возможно — только представьте себе его стоимость!

Намного дешевле добиться такой же вычислительной мощности объединив множество обычных компьютеров в кластер.

Остается лишь вопрос межкомпьютерного взаимодействия привычными средствами, схожими по API с потокобезопасными коллекциями из пакета java.util.concurrent и дающими те же гарантии, но не на одном компьютере, а на всем кластере.

Такие возможности и гарантии могут дать распределенные структуры данных.

Рассмотрим некоторые из распределенных структур данных, позволяющих, без особых сложностей, сделать из многопоточного алгоритма распределенный.

Дисклеймер

Рассматриваемые в дальнейших примерах, реализации распределенных структуры данных являются частью функционала распределенного кеша Apache Ignite [1].

AtomicReference и AtomicLong

IgniteAtomicReference предоставляет compare-and-set [2] семантику.

Предположим, есть 2 компьютера, связаных общей сетью.

Запустим Apache Ignite на обоих (предварительно подключив библиотеки [3])

// Запустим экземпляр (node) Ignite локально.
// В зависимости от конфигурации, node станет частью кластера хранящего и обрабатывающего данные,
// либо клиентской node, позволяющей иметь доступ к этому кластеру.
Ignite ignite = Ignition.ignite();

// Создадим или получим, ранее созданный, IgniteAtomicReference
// со стартовым значением "someVal"
IgniteAtomicReference<String> ref = ignite.atomicReference("refName", "someVal", true);

На обоих компьютерах попробуем изменить хранимое значение

// Изменим значение если текущее соответствует ожидаемому.
boolean res = ref.compareAndSet("someVal", "someNewVal"); 

// Изменение, в рамках кластера Ignite, произойдет.
// Первый вызов изменит значение, и res будет равно true, 
// Второй вызов получит res равное false, т.к. текущее значение уже не равно "someVal"

Восстановим оригинальное значение

ref.compareAndSet("someNewVal", "someVal"); // Изменение произойдет.

IgniteAtomicLong расширяет семантику IgniteAtomicReference добавляя атомарные increment/decrement операции:

// Создадим или получим, ранее созданный, IgniteAtomicLong.
final IgniteAtomicLong atomicLong = ignite.atomicLong("atomicName", 0, true);

// Выведем инкрементированное значение.
System.out.println("Incremented value: " + atomicLong.incrementAndGet());

Подробная документация: https://apacheignite.readme.io/docs/atomic-types [4]

Примеры на github [5]

AtomicSequence

IgniteAtomicSequence позволяет получать уникальный идентификатор, причем уникальность гарантируется в рамках всего кластера.

IgniteAtomicSequence работает быстрее IgniteAtomicLong, т.к. вместо того чтобы синхронизироваться глобально на получении каждого идентификатора, получает сразу диапазон значений и, далее, выдает идентификаторы из этого диапазона.

// Создадим или получим, ранее созданный, IgniteAtomicSequence.
final IgniteAtomicSequence seq = ignite.atomicSequence("seqName", 0, true);

// Получим 20 уникальных идентификаторов.
for (int i = 0; i < 20; i++) {
  long currentValue = seq.get();
  long newValue = seq.incrementAndGet();  
  ...
}

Подробная документация: https://apacheignite.readme.io/docs/id-generator [8]
Пример на github [5]IgniteAtomicSequenceExample [9]

CountDownLatch

IgniteCountDownLatch позволяет синхронизировать потоки на разных компьютерах в рамках одного кластера.

Запустим следующий код на 10 компьютерах одного кластера

// Создадим или получим, ранее созданный, IgniteCountDownLatch
// установив значение счетчика в 10
IgniteCountDownLatch latch = ignite.countDownLatch("latchName", 10, false, true);

// Декрементируем счетчик
latch.countDown();

// Дождемся пока countDown() будет вызван 10 раз
latch.await();

В результате, все latch.await() выполнятся гарантированно позже того, как выполнятся все десять latch.countDown().

Подробная документация: https://apacheignite.readme.io/docs/countdownlatch [10]
Пример на github [5]IgniteCountDownLatchExample [11]

Semaphore

IgniteSemaphore позволяет лимитировать число одновременных действий в рамках одного кластера.

// Создадим или получим, ранее созданный, IgniteSemaphore
// установив значение счетчика в 20
IgniteSemaphore semaphore = ignite.semaphore("semName", 20,  true,  true);

// Получаем разрешение
semaphore.acquire();

try {
    // Семафор захвачен, возможно выполнение кода
}
finally {
    // Возвращаем разрешение
    semaphore.release();
}

Гарантируется, что, одновременно, не более 20 потоков, в рамках одного кластера, будут выполнять код внутри секции try.

Подробная документация: https://apacheignite.readme.io/docs/distributed-semaphore [12]
Пример на github [5]IgniteSemaphoreExample [13]

BlockingQueue

IgniteQueue предоставляет те же возможности, что и BlockingQueue, но в рамках целого кластера.

// Создадим или получим, ранее созданный, IgniteQueue.
IgniteQueue<String> queue = ignite.queue("queueName", 0, colCfg);

Попытаемся получить элемент из очереди

// Получим первй элемент в очереди
queue.take();

Выполнение приостановится на queue.take() до тех пор пока, в рамках того же кластера, не произойдет добавление в очередь

// Добавим объект в очередь
queue.put("data");

Подробная документация: https://apacheignite.readme.io/docs/queue-and-set [14]
Пример на github [5]IgniteQueueExample [15]

Вместо заключения

В связи с тем, что статья получилась исключительно обзорной, а многим, наверняка, интересно как же все это работает под капотом — в следующей статье я рассмотрю особенности реализации каждой из распределенных структур данных описанных в данной статье.

Автор: Антон Виноградов

Источник [16]


Сайт-источник PVSM.RU: https://www.pvsm.ru

Путь до страницы источника: https://www.pvsm.ru/java/256062

Ссылки в тексте:

[1] Apache Ignite: http://ignite.apache.org

[2] compare-and-set: https://ru.wikipedia.org/wiki/%D0%A1%D1%80%D0%B0%D0%B2%D0%BD%D0%B5%D0%BD%D0%B8%D0%B5_%D1%81_%D0%BE%D0%B1%D0%BC%D0%B5%D0%BD%D0%BE%D0%BC

[3] предварительно подключив библиотеки: https://apacheignite.readme.io/docs/getting-started#get-it-with-maven

[4] https://apacheignite.readme.io/docs/atomic-types: https://apacheignite.readme.io/docs/atomic-types

[5] github: https://github.com/apache/ignite

[6] IgniteAtomicReferenceExample: https://github.com/apache/ignite/blob/master/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteAtomicReferenceExample.java

[7] IgniteAtomicLongExample: https://github.com/apache/ignite/blob/master/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteAtomicLongExample.java

[8] https://apacheignite.readme.io/docs/id-generator: https://apacheignite.readme.io/docs/id-generator

[9] IgniteAtomicSequenceExample: https://github.com/apache/ignite/blob/master/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteAtomicSequenceExample.java

[10] https://apacheignite.readme.io/docs/countdownlatch: https://apacheignite.readme.io/docs/countdownlatch

[11] IgniteCountDownLatchExample: https://github.com/apache/ignite/blob/master/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteCountDownLatchExample.java

[12] https://apacheignite.readme.io/docs/distributed-semaphore: https://apacheignite.readme.io/docs/distributed-semaphore

[13] IgniteSemaphoreExample: https://github.com/apache/ignite/blob/master/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteSemaphoreExample.java

[14] https://apacheignite.readme.io/docs/queue-and-set: https://apacheignite.readme.io/docs/queue-and-set

[15] IgniteQueueExample: https://github.com/apache/ignite/blob/master/examples/src/main/java/org/apache/ignite/examples/datastructures/IgniteQueueExample.java

[16] Источник: https://habrahabr.ru/post/328086/?utm_source=habrahabr&utm_medium=rss&utm_campaign=best