Багатопотокове програмування в Java 8. Частина третя. Атомарні змінні й конкурентні таблиці


Дізнайтесь більше про нові кар'єрні можливості в EchoUA. Цікаві проекти, ринкова оплата, гарний колектив. Надсилайте резюме та приєднуйтеся до нас.

Розповідає Бенджамін Вінтерберг, Software Engineer


Ласкаво просимо, до третьої частини керівництва по паралельному програмуванню в Java 8. У першій частині ми розглядали, як виконувати код паралельно за допомогою потоків, задач і сервісів-виконавців. У другий розбиралися з тим, як синхронізувати доступ до змінюваних об’єктів за допомогою ключового слова synchronized, блокувань і семафорів. Сьогодні, в завершальній частині, я розповім про дві дуже важливі частини Concurrency API: про атомарні змінні й конкурентні таблиці (Concurrent Maps).

AtomicInteger

Пакет java.concurrent.atomic містить багато корисних класів для виконання атомарних операцій. Операція називається “атомарною” тоді, коли її можна безпечно виконувати при паралельних обчисленнях в декількох потоках, не використовуючи при цьому ні блокувань, ні synchronized, як ми це робили на попередньому уроці.

Усередині атомарні класи дуже активно використовують порівняння з обміном (compare-and-swap, CAS) атомарну інструкцію, яку підтримують більшість сучасних процесорів. Ці інструкції працюють набагато швидше, ніж синхронізація за допомогою блокувань. Тому, якщо Вам просто треба змінювати одну змінну за допомогою декількох потоків, краще вибирати атомарні класи.

Наведу декілька прикладів з використанням AtomicInteger, одного з атомарних класів:

AtomicInteger atomicInt = new AtomicInteger (0);ExecutorService executor = Executors.newFixedThreadPool (2);
IntStream.range (0, 1000) .forEach (i -> executor.submit (atomicInt::incrementAndGet));stop(executor);System.out.println(atomicInt.get()); // => 1000

Як бачите, використання AtomicInteger замість звичайного Integer дозволило нам коректно збільшити число, розподіливши роботу відразу за двома потоками. Ми можемо не турбуватися про безпеку, тому що incrementAndGet () є атомарною операцією.

Клас AtomicInteger підтримує багато різних атомарних операцій. Метод updateAndGet () приймає як аргумент лямбда-вираз і виконує над числом задані арифметичні операції:

AtomicInteger atomicInt = new AtomicInteger (0);ExecutorService executor = Executors.newFixedThreadPool (2);IntStream.range (0, 1000).forEach (i -> {
Runnable task = () -> atomicInt.updateAndGet (n -> n + 2); executor.submit (task); });stop (executor);System.out.println (atomicInt.get()); // => 2000

Метод accumulateAndGet () приймає лямбда-вираз типу IntBinaryOperator. От як ми можемо використати його, щоб підсумувати всі числа від нуля до тисячі:

AtomicInteger atomicInt = new AtomicInteger (0);ExecutorService executor = Executors.newFixedThreadPool (2);IntStream.range (0, 1000) .forEach (i -> { Runnable task = () -> atomicInt.accumulateAndGet (i, (n, m) -> n + m); executor.submit (task); });stop (executor);System.out.println (atomicInt.get()); // => 499500

Серед інших атомарних класів хочеться згадати такі, як AtomicBoolean, AtomicLong і AtomicReference.

LongAdder

Клас LongAdder може виступати альтернативою AtomicLong для послідовного складання чисел.

ExecutorService executor = Executors.newFixedThreadPool (2);IntStream.range (0, 1000) .forEach (i -> executor.submit (adder::increment));stop(executor);System.out.println(adder.sumThenReset()); // => 1000

Так само, як і в інших атомарних чисел, у LongAdder є методи increment () і add (). Проте замість того, щоб складати числа відразу, він просто зберігає у себе набір доданків, щоб зменшити взаємодію між потоками. Дізнатись про результат можна за допомогою виклику sum () чи sumThenReset (). Цей клас використовується в ситуаціях, коли додавати числа доводиться набагато частіше, ніж просити результат (часто це якісь статистичні дослідження, наприклад підрахунок кількості запитів). Нескладно здогадатися, що, даючи приріст у продуктивності, LongAdder вимагає набагато більшої кількості пам’яті через те, що він зберігає всі доданки.

LongAccumulator

Клас LongAccumulator дещо розширює можливості LongAdder. Замість простого складання він обробляє значення, що входять, за допомогою лямбди типу LongBinaryOperator, яка передається при ініціалізації. Виглядає це так:

LongBinaryOperator op =  (x, y) -> 2 * x + y;LongAccumulator accumulator = new LongAccumulator (op, 1l);ExecutorService executor = Executors.newFixedThreadPool (2);IntStream.range (0, 10) .forEach (i -> executor.submit (() -> accumulator.accumulate (i)));stop (executor);System.out.println (accumulator.getThenReset()); // => 2539

У даному прикладі при кожному виклику accumulate () значення акумулятора збільшується вдвічі, й лише потім підсумовується з i. Так само, як і LongAdder, LongAccumulator зберігає весь набір переданих значень у пам’яті.
Прим. перекл. Насправді, приклад не зовсім коректний; згідно з документацією, LongAccumulator не гарантує порядку виконання операцій. Коректною формулою була б, наприклад x+2*y, оскільки за будь-якого порядку виконання наприкінці виходитиме те саме значення.

ConcurrentMap

Інтерфейс ConcurrentMap наслідує від звичайного Map і надає опис однієї з найкорисніших колекцій для конкурентного використання. Щоб продемонструвати нові методи інтерфейсу, ми використовуватимемо ось цю заготівлю:

ConcurrentMap<String, String> map = new ConcurrentHashMap<>();map.put ("foo", " bar");map.put ("han", " solo");map.put ("r2", "d2");map.put ("c3", "p0");

Метод forEach () приймає лямбду типу BiConsumer. Цій лямбді передаватимуться як аргументи всі ключі й значення таблиці по черзі. Цей метод може використовуватися як заміна for - each циклам з ітерацією по всіх Entry. Ітерація виконується послідовно, в поточному потоці.

map.forEach ((key, value) -> System.out.printf ("%s = %sn", key, value));

Метод putIfAbsent () поміщає в таблицю значення, тільки якщо за цим ключем ще немає іншого значення. Цей метод є потокобезпечним (про крайню міру в реалізації ConcurrentHashMap), тому Вам не треба використовувати synchronized, коли Ви хочете використати його в декількох потоках (те саме справедливе і для звичайного put ()) :

String value = map.putIfAbsent ("c3", "p1");System.out.println (value); // p0

Метод getOrDefault () працює так само, як і звичайний get (), з тією лише різницею, що за відсутності значення за цим ключем він поверне значення за умовчанням, що передається другим аргументом:

String value = map.getOrDefault ("hi", " there");System.out.println (value); // there

Метод replaceAll () приймає як аргумент лямбда-вираз типу BiFunction. Цій лямбді по черзі передаються всі комбінації ключа-значения з карти, а результат, який вона повертає, записується у відповідному ключі як значення:

map.replaceAll ((key, value) -> "r2".equals (key) ? "d3": value);System.out.println (map.get ("r2")); // d3

Якщо Вам треба змінити так само тільки один ключ, це дозволяє зробити метод compute ():

map.compute ("foo", (key, value) -> value + value);System.out.println (map.get ("foo")); // barbar

Крім звичайного compute (), є так само методи computeIfAbsent () і computeIfPresent (). Вони змінюють значення тільки якщо значення за цим ключем немає (чи є, відповідно).

І, нарешті, метод merge (), який може бути використаний для об’єднання наявного ключа з новим значенням. Як аргумент він приймає ключ, нове значення і лямбду, яка визначає, як саме нове значення має бути об’єднане зі старим :

map.merge ("foo", " boo", (oldVal, newVal) -> newVal + " was " + oldVal);System.out.println (map.get ("foo")); // boo was bar

ConcurrentHashMap

Крім методів, описаних у ConcurrencyMap, у ConcurrentHashMap було додано і ще декілька своїх. Так само, як і паралельні stream ‘и, ці методи використовують спеціальний ForkJoinPool, доступний через ForkJoinPool.commonPool () у Java 8. Цей пул використовує свої налаштування для кількості потоків, що ґрунтуються на кількості ядер. У мене їх 4, отже, використовуватимуться три потоки:

System.out.println (ForkJoinPool.getCommonPoolParallelism()); // 3

Це значення може бути спеціально змінене за допомогою параметра JVM:

- Djava.util.concurrent.ForkJoinPool.common.parallelism=5

Ми розглянемо три нові методи: forEach, search and reduce. У кожного з них є перший аргумент, який називається parallelismThreshold, який визначає мінімальну кількість елементів у колекції, при якому операція виконуватиметься в декількох потоках. Якщо в колекції 499 елементів, а перший параметр дорівнює 500, то операція виконуватиметься в одному потоці послідовно. У наших прикладах ми використовуватимемо перший параметр, що дорівнює одиниці, щоб операції завжди виконувалися паралельно.

Для прикладів нижче ми використовуватимемо ту саму таблицю, що і вище (проте оголосимо її ім’ям класу, а не інтерфейсу, щоб нам були доступні всі методи):

ConcurrentHashMap<String, String> map = new ConcurrentHashMap<>();map.put ("foo", "bar");map.put ("han", "solo");map.put ("r2", "d2");map.put ("c3", "p0");

ForEach

Працює метод так само, як і в ConcurrentMap. Для ілюстрації багатопоточності ми виводитимемо назви потоків (не забувайте, що їх кількість для мене обмежена трьома):

map.forEach (1, (key, value) -> System.out.printf ("key: %s; value: %s; thread: %sn", key, value, Thread.currentThread ().getName()));// key: r2; value: d2; thread: main// key: foo; value: bar; thread: ForkJoinPool.commonPool - worker - 1// key: han; value: solo; thread: ForkJoinPool.commonPool - worker - 2// key: c3; value: p0; thread: main

Search

Метод search () приймає лямбда-вирази типу BiFunction, у яку передаються всі пари ключ-значення по черзі. Функція повинна повертати null, якщо необхідне входження знайдене. У разі якщо функція поверне не null, подальший пошук буде зупинений. Не забувайте, що дані в хеш-таблиці зберігаються неврегульовано. Якщо Ви покладатиметеся на порядок, в якому додавали дані в неї, то можете не отримати очікуваного результату. Якщо умовам пошуку задовольняють декілька входжень, результат точно передбачити не можна.

String result = map.search (1, (key, value) -> { System.out.println (Thread.currentThread ().getName()); if ("foo".equals (key)) { return value; } return null;});System.out.println ("Result: " + result);
// ForkJoinPool.commonPool - worker - 2// main// ForkJoinPool.commonPool - worker - 3// Result: bar

Ось інший приклад, який покладається тільки на значення:

String result = map.searchValues (1, value -> { System.out.println (Thread.currentThread ().getName()); if (value.length () > 3) { return value; } return null;});System.out.println ("Result: " + result);// ForkJoinPool.commonPool - worker - 2// main// main// ForkJoinPool.commonPool - worker - 1// Result: solo

Reduce

Метод reduce () Ви могли вже зустрічати в Java 8 Streams. Він приймає дві лямбди типу BiFunction. Перша функція перетворює пару ключ-значення в один об’єкт (будь-якого типу). Друга функція поєднує всі отримані значення в єдиний результат, ігноруючи будь-які можливі null-значения.

tring result = map.reduce (1, (key, value) -> { System.out.println ("Transform: " + Thread.currentThread ().getName()); return key + "=" + value; }, (s1, s2) -> { System.out.println ("Reduce: " + Thread.currentThread ().getName()); return s1 + ", " + s2; });System.out.println ("Result: " + result);// Transform: ForkJoinPool.commonPool - worker - 2// Transform: main// Transform: ForkJoinPool.commonPool - worker - 3// Reduce: ForkJoinPool.commonPool - worker - 3// Transform: main
// Reduce: main// Reduce: main// Result: r2=d2, c3=p0, han=solo, foo=bar

На цьому все. Сподіваюся, що мої статті були для Вас корисними.

Переклад статті “Java 8 Concurrency Tutorial: Atomic Variables and ConcurrentMap”

Київ, Харків, Одеса, Дніпро, Запоріжжя, Кривий Ріг, Вінниця, Херсон, Черкаси, Житомир, Хмельницький, Чернівці, Рівне, Івано-Франківськ, Кременчук, Тернопіль, Луцьк, Ужгород, Кам'янець-Подільський, Стрий - за статистикою саме з цих міст програмісти найбільше переїжджають працювати до Львова. А Ви розглядаєте relocate?


Залишити відповідь

Ваша e-mail адреса не оприлюднюватиметься. Обов’язкові поля позначені *