Багатопотокове програмування в Java 8. Частина друга. Синхронізація доступу до змінюваних об’єктів


Дізнайтесь більше про нові кар'єрні можливості в EchoUA. Цікаві проекти, ринкова оплата, гарний колектив. Надсилайте резюме та приєднуйтеся до нас.

Розповідає Бенджамін Вінтерберг, Software Engineer


Ласкаво просимо, до другої частини керівництва по паралельному програмуванню у Java 8. У попередній частині ми розглядали, як виконувати код паралельно за допомогою потоків, задач і сервісів виконавців. Сьогодні ми розберемо, як синхронізувати доступ до змінюваних об’єктів за допомогою ключового слова synchronized, блокувань і семафорів.

Більшість принципів, описаних у цій статті, справедливі й для старіших версій Java. Проте, я не мав метою вивчення проблем сумісності й розгляд прикладів, які містять як лямбди, так і нові можливості багатопотоковості. Якщо Ви поверхово ознайомлені з лямбда-виразами, то Вам варто спершу прочитати мій туторіал з Java 8.

Для простоти прикладів я використовую в них двох метод-помічників: sleep (секунди) і stop (сервіс-виконавець). Їх реалізації я виклав на GitHub, якщо комусь цікаво.

Синхронізація

Ми вже дізналися, як виконувати код паралельно за допомогою сервісів-виконавців (ExecutorService). Під час написання багатопотокової програми треба приділяти особливу увагу роботі із загальними для потоків змінюваними об’єктами. Припустимо, що ми хочемо збільшити таку змінну на одиницю.

Ми створюємо поле count і метод increment (), який збільшує count на одиницю:

int count = 0;void increment (){ count = count + 1;}

Якщо ми викликатимемо цей метод одночасно з двох потоків, у нас виникнуть серйозні проблеми:

ExecutorService executor = Executors.newFixedThreadPool (2);IntStream.range (0, 10000) .forEach (i -> executor.submit (this::increment));stop(executor);
System.out.println (count); // 9965

Замість очікуваного постійного результату 10000 ми щоразу отримуватимемо різні числа. Причина цього використання змінюваної змінної декількома потоками без синхронізації, що викликає стан гонитви (race condition).

Збільшення числа на одиницю відбувається в три кроки: (1) обчислити значення змінної, (2) збільшити це значення на одиницю і (3) записати назад нове значення. Якщо два потоки одночасно виконуватимуть ці кроки, то цілком імовірно, що вони можуть виконати перший крок одночасно, прочитавши те саме значення. Потім вони запишуть у змінну те саме значення, і замість збільшення на 2 вийде збільшення на одиницю. Внаслідок цього кінцеве значення і виходить менше за очікуване.

На щастя, Java підтримує синхронізацію потоків з ранніх версій, використовуючи для цього ключове слово synchronized. От як слід було б переписати наш код:

synchronized void incrementSync (){ count = count + 1;}

Тоді, після виконання методу 10000 разів, ми завжди набуватимемо значення 10000, і жодної гонитви станів виникати не буде:

ExecutorService executor = Executors.newFixedThreadPool (2);IntStream.range (0, 10000) .forEach (i -> executor.submit (this::incrementSync));stop(executor);System.out.println(count); // 10000

Це ключове слово можна застосовувати не лише до методів, але й до окремих їх блоків:

void incrementSync (){ synchronized (this){ count = count + 1; }}

Блокування

Крім використання блокувань неявно (за допомогою ключового слова synchronized), Concrurrency API пропонує багато способів їх явного використання, визначених інтерфейсом Lock. За допомогою явних блокувань можна налаштувати роботу програми набагато тонше і тим самим зробити її ефективніше.

Стандартний JDK надає багато реалізацій Lock, які ми зараз і розглянемо.

ReentrantLock

Клас ReentrantLock реалізує ту саму поведінку, що й звичайні неявні блокування. Давайте спробуємо переписати наш приклад зі збільшенням на одиницю:

ReentrantLock lock = new ReentrantLock ();int count = 0;void increment (){ lock.lock (); try { count++; } finally { lock.unlock (); }}

Блокування здійснюється за допомогою методу lock (), а звільняються ресурси застосуванням методу unlock (). Дуже важливо обертати код try{}finally{}, щоб ресурси звільнилися навіть у разі виведення винятку. Код, представлений вище, так само потокобезпечний, як і його аналог із synchronized. Якщо один потік викликав lock (), а інший потік намагається отримати доступ до методу перед викликом unlock (), то другий потік простоюватиме до тих пір, поки метод не звільниться. Тільки один потік може утримувати блокування в кожен момент часу.

Для більшого контролю явні блокування підтримують багато спеціальних методів:

ExecutorService executor = Executors.newFixedThreadPool (2);ReentrantLock lock = new ReentrantLock ();executor.submit (() -> {
lock.lock (); try { sleep (1); } finally { lock.unlock (); }});executor.submit (() -> { System.out.println ("Locked: " + lock.isLocked()); System.out.println ("Held by me: " + lock.isHeldByCurrentThread()); boolean locked = lock.tryLock (); System.out.println ("Lock acquired: " + locked);});stop (executor);

Поки перший потік утримує блокування, другий виведе наступну інформацію:

Locked: true
Held by me: false
Lock acquired: false

Метод tryLock (), на відміну від звичайного lock () не зупиняє поточний потік у разі, якщо ресурс вже зайнятий. Він повертає булевий результат, який варто перевірити перед тим, як намагатися виконувати якісь дії із загальними об’єктами (істина означає, що контроль над ресурсами захопити вдалося).

ReadWriteLock

Інтерфейс ReadWriteLock пропонує інший тип блокувань – окремі для читання, і  для запису. Цей інтерфейс був доданий з міркування, що прочитувати дані (будь-якій кількості потоків) безпечно до тих пір, поки жоден з них не змінює змінну. Отже, блокування для читання (read- ock) може утримувати будь-яку кількість потоків до тих пір, поки не утримує блокування для запису (write-lock). Такий підхід збільшить продуктивність у разі, коли читання використовується набагато частіше, ніж запис.

ExecutorService executor = Executors.newFixedThreadPool (2);Map<String, String> map = new HashMap<>();ReadWriteLock lock = new ReentrantReadWriteLock ();executor.submit (() -> { lock.writeLock ().lock (); try { sleep (1); map.put (" foo", " bar");

Якщо Ви спробуєте запустити цей приклад, то помітите, що обидва потоки, створені для читання, простоюватимуть секунду, чекаючи завершення роботи потоку для запису. Після зняття блокування вони виконаються паралельно, і одночасно запишуть результат у консоль. Їм не треба чекати на завершення роботи один одного, тому що виконувати одночасне читання цілком безпечно (до тих пір, поки жоден потік не працює паралельно на запис).

StampedLock

 

У Java 8 з’явився новий тип блокувань – StampedLock. Так само, як і в попередніх прикладах, він підтримує розділення на readLock () і writeLock (). Проте, на відміну від ReadWriteLock, метод блокування StampedLock повертає “штамп” – значення типу long. Цей штамп може використовуватися надалі як для вивільнення ресурсів, так і для перевірки стану блокування. На додаток, у цього класу є методи, для реалізації “оптимістичного” блокування. Розглянемо все за порядком.

Ось так, слід було переписати наш попередній приклад під використання StampedLock:

ExecutorService executor = Executors.newFixedThreadPool (2);Map<String, String> map = new HashMap<>();StampedLock lock = new StampedLock ();
executor.submit (() -> { long stamp = lock.writeLock (); try { sleep (1); map.put ("foo", " bar"); } finally { lock.unlockWrite (stamp); }});Runnable readTask = () -> { long stamp = lock.readLock (); try { System.out.println (map.get ("foo")); sleep (1); } finally { lock.unlockRead (stamp); }};executor.submit (readTask);executor.submit (readTask);stop (executor);

Працювати цей код буде так само, як і його брат-близнюк з ReadWriteLock. Тут, щоправда, варто згадати, що в StampedLock не реалізована реентерантність. Особливу увагу треба приділяти тому, щоб не потрапити в ситуацію взаємного блокування (deadlock).

У наступному прикладі демонструється “оптимістичне блокування”:

ExecutorService executor = Executors.newFixedThreadPool (2);StampedLock lock = new StampedLock ();executor.submit (() -> { long stamp = lock.tryOptimisticRead (); try { System.out.println ("Optimistic Lock Valid: " + lock.validate (stamp)); sleep (1); System.out.println ("Optimistic Lock Valid: " + lock.validate (stamp)); sleep (2); System.out.println ("Optimistic Lock Valid: " + lock.validate (stamp)); } finally { lock.unlock (stamp); }});executor.submit (() -> { long stamp = lock.writeLock (); try { System.out.println ("Write Lock acquired"); sleep (2); } finally { lock.unlock (stamp); System.out.println ("Write done"); }});stop (executor);

Оптимістичне блокування для читання, що викликається за допомогою методу tryOptimisticRead (), відрізняється тим, що воно завжди повертатиме “штамп”, не блокуючи поточний потік, незалежно від того, чи зайнятий ресурс, до якого воно звернулося. Якщо ресурс був заблокований блокуванням для запису, то повернений штамп дорівнюватиме нулю. У будь-який момент можна перевірити, чи є блокування валідним за допомогою lock.validate (stamp). Для наведеного вище коду результат буде таким:

Optimistic Lock Valid: true
Write Lock acquired
Optimistic Lock Valid: false
Write done
Optimistic Lock Valid: false

Оптимістичне блокування є валидною з тієї миті, як йому вдалося захопити ресурс. На відміну від звичайних блокувань для читання, оптимістичне не забороняє іншим потокам блокувати ресурс для запису. Що ж відбувається в коді вище? Після захоплення ресурсу блокування є валидним і оптимістичний потік вирушає спати. В цей час інший потік блокує ресурси для запису, не чекаючи на завершення роботи читання. Починаючи з цього моменту, оптимістичне блокування перестає бути валідним (навіть після закінчення запису).

Таким чином, при використанні оптимістичних блокувань Вам треба постійно стежити за їх валідністю (перевіряти її треба вже після того, як виконані всі необхідні операції).

Іноді може бути корисним перетворити блокування для читання на блокування для запису, не вивільняючи ресурси. У StampedLock це можна зробити за допомогою методу tryConvertToWriteLock (), як у даному прикладі:

ExecutorService executor = Executors.newFixedThreadPool (2);StampedLock lock = new StampedLock ();executor.submit (() -> { long stamp = lock.readLock (); try { if (count == 0) { stamp = lock.tryConvertToWriteLock (stamp); if (stamp == 0l) { System.out.println ("Could not convert to write lock"); stamp = lock.writeLock (); } count = 23; } System.out.println (count); } finally { lock.unlock (stamp); }});stop (executor);

У даному прикладі ми хочемо прочитати значення змінної count і вивести його в консоль. Проте якщо значення дорівнює нулю, ми хочемо змінити його на 23. Для цього треба виконати перетворення з readLock на writeLock, щоб не перешкодити іншим потокам обробляти змінну. У разі якщо Ви викликали tryConvertToWriteLock () у той момент, коли ресурс зайнятий для запису іншим потоком, поточний потік зупинений не буде, проте метод поверне нульове значення. Тоді можна викликати writeLock () вручну.

Семафори

Семафори – відмінний спосіб обмежити кількість потоків, які одночасно працюють над тим самим ресурсом:

ExecutorService executor = Executors.newFixedThreadPool (10);Semaphore semaphore = new Semaphore (5);Runnable longRunningTask = () -> {
boolean permit = false; try { permit = semaphore.tryAcquire (1, TimeUnit.SECONDS); if (permit) { System.out.println ("Semaphore acquired"); sleep (5); } else { System.out.println ("Could not acquire semaphore"); } } catch (InterruptedException e) { throw new IllegalStateException (e); } finally { if (permit) { semaphore.release (); } }}IntStream.range (0, 10) .forEach (i -> executor.submit (longRunningTask));stop (executor);

У даному прикладі сервіс-виконавець може потенційно запустити всі 10 потоків, що викликаються, проте ми створили семафор, який обмежує кількість одночасно виконуваних потоків до п’яти. Знову нагадаю, що важливо звільняти ресурси саме у блоці finally{} на випадок виведення винятків.  Для пропонованого вище коду виведення буде таким:

Semaphore acquired
Semaphore acquired
Semaphore acquired
Semaphore acquired
Semaphore acquired
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore
Could not acquire semaphore

Це була друга частина серії статей про багатопотокове програмування. Настійно рекомендую розібрати наведені вище приклади самостійно. Усі вони, як завжди, доступні на GitHub. Можете сміливо форкати репозиторій і додавати його у вибране.

Сподіваюся, Вам сподобалася стаття. Якщо у Вас виникли запитання, то Ви можете задати їх у твітері.

Переклад статті “Java 8 Concurrency Tutorial: Synchronization and Locks”

Київ, Харків, Одеса, Дніпро, Запоріжжя, Кривий Ріг, Вінниця, Херсон, Черкаси, Житомир, Хмельницький, Чернівці, Рівне, Івано-Франківськ, Кременчук, Тернопіль, Луцьк, Ужгород, Кам'янець-Подільський, Стрий - за статистикою саме з цих міст програмісти найбільше переїжджають працювати до Львова. А Ви розглядаєте relocate?


Залишити відповідь

Ваша e-mail адреса не оприлюднюватиметься. Обов’язкові поля позначені *