Багатопотокове програмування в Java 8. Частина перша. Паралельне виконання коду за допомогою потоків


Дізнайтесь більше про нові кар'єрні можливості в EchoUA. Цікаві проекти, ринкова оплата, гарний колектив. Надсилайте резюме та приєднуйтеся до нас.

Розповідає Бенджамін Вінтерберг, Software Engineer


Ласкаво просимо, до першої частини керівництва по паралельному програмуванню в Java 8. У цій частині ми на прикладах розглянемо, як виконувати код паралельно, за допомогою потоків, задач і сервісів виконавців.

Concurrency API було представлено водночас із виходом Java 5, відтоді відбувався розвиток із кожною новою версією Java. Більшу частину прикладів можна реалізувати на ранніх версіях, однак у цій статті я збираюсь використовувати лямбда-вирази. Якщо Ви ще не ознайомлені з нововведеннями Java 8, рекомендую опрацювати моє керівництво.

 

Потоки і задачі

Усі сучасні операційні системи підтримують паралельне виконання коду за допомогою процесів і потоків. Процес – це екземпляр програми, який запускається незалежно від інших. Наприклад, коли Ви запускаєте програму на Java, ОС створює новий процес, який працює паралельно іншим. Усередині процесів ми можемо використати потоки, у такий спосіб отримавши від процесора максимум можливостей.

Потоки (threads) у Java підтримуються, починаючи з JDK 1.0. До того як запустити потік, йому потрібно надати ділянку коду, що зазвичай називають “задача” (task). Це здійснюють через реалізацію інтерфейса Runnable, у якого є тільки один метод без аргументів, що повертає  voidrun (). Ось приклади того, як це працює:

Runnable task = () -> { String threadName = Thread.currentThread ().getName (); System.out.println ("Hello " + threadName);};task.run ();Thread thread = new Thread (task);thread.start ();System.out.println ("Done"!);

Оскільки інтерфейс Runnable функціональний, ми можемо використати лямбда-вирази, які з’явилися в Java 8. У прикладі ми створюємо задачу, яка виводить ім’я поточного потоку на консоль, і запускаємо її спочатку в головному потоці, а потім – в окремому.

Результат виконання цього коду може виглядати так:

Hello mainHello Thread - 0done!

чи так:

Hello mainDone!Hello Thread - 0

Через паралельне виконання ми не можемо сказати, буде наш потік запущений до або після виведення “Done!” на екран. Ця особливість робить паралельне програмування складною задачею у великих додатках.

Потоки можуть бути призупинені на деякий час. Це дуже корисно, якщо ми хочемо симулювати задачу, що довго виконується. Наприклад, так:

Runnable runnable = () -> { try { String name = Thread.currentThread ().getName (); System.out.println ("Foo " + name); TimeUnit.SECONDS.sleep (1); System.out.println ("Bar " + name); } catch (InterruptedException e) { e.printStackTrace (); }};Thread thread = new Thread (runnable);thread.start ();

Коли Ви запустите цей код, то побачите секундну затримку між виведенням першого і другого рядка на екран. TimeUnit – корисний клас для роботи з одиницями часу, але те саме можна зробити за допомогою Thread.sleep (1000).

Працювати з потоками безпосередньо незручно, що спричиняє помилки. Тому в 2004 році в Java 5 додали Concurrency API. Він знаходиться в пакеті java.util.concurrent і містить велику кількість корисних класів і методів для багатопотокового програмування. Відтоді Concurrency API безперервно розвивається.

Давайте тепер детальніше розглянемо одну з найважливіших частин Concurrency API – сервіс виконавців (executor services).

Виконавці

Concurrency API вводить поняття сервісу-виконавця (ExecutorService) – високорівневу заміну роботи з потоками безпосередньо. Виконавці виконують задачу асихронно і зазвичай використовують пул потоків, так що нам не потрібно створювати їх вручну. Всі потоки з пулу будуть використані повторно після виконання задачі, тобто ми можемо створити в додатку стільки задач, скільки хочемо, використовуючи одного виконавця.

Ось як виглядатиме наш перший приклад з використанням виконавця:

ExecutorService executor = Executors.newSingleThreadExecutor ();executor.submit (() -> { String threadName = Thread.currentThread ().getName (); System.out.println ("Hello " + threadName);});// => Hello pool - 1 - thread - 1

Клас Executors надає зручні методи-фабрики для створення різних сервісів виконавців. У даному випадку ми використали виконавця з одним потоком.

Результат виглядає так само, як і попереднього разу. Цей код має важливу відмінність – він ніколи не зупиниться. Роботу виконавців потрібно завершувати явно. Для цього в інтерфейсі ExecutorService є два методи: shutdown (), який чекає завершення запущених задач, і shutdownNow (), який зупиняє виконавця негайно.

Вважаю за краще зупиняти виконавців:

try { System.out.println ("attempt to shutdown executor"); executor.shutdown (); executor.awaitTermination (5, TimeUnit.SECONDS);}catch (InterruptedException e) { System.err.println ("tasks interrupted");}finally {
if (!executor.isTerminated()) { System.err.println ("cancel non - finished tasks"); } executor.shutdownNow (); System.out.println ("shutdown finished");}

Виконавець намагається завершити роботу, чекаючи завершення запущених задач протягом певного часу (5 сек.). Після закінчення цього часу він зупиняється, перериваючи всі незавершені задачі.

Callable і Future

Крім Runnable, виконавці можуть приймати інший вид задач, який називається Callable. Callable – це також функціональний інтерфейс, але, на відміну від Runnable, він може повертати значення.

Давайте напишемо задачу, яка повертає ціле число після секундної паузи:

Callable task = () -> { try { TimeUnit.SECONDS.sleep (1); return 123; }
catch (InterruptedException e) { throw new IllegalStateException ("task interrupted", e); }};

Callable-задачі також можуть бути передані виконавцям. Як тоді отримати результат, який вони повертають? Оскільки метод submit () не чекає завершення задачі, виконавець не може повернути результат задачі безпосередньо. Замість цього виконавець повертає спеціальний об’єкт Future, у якого ми зможемо запросити результат задачі.

ExecutorService executor = Executors.newFixedThreadPool (1);Future future = executor.submit (task);System.out.println ("future done? " + future.isDone());Integer result = future.get ();System.out.println ("future done? " + future.isDone());System.out.print ("result: " + result);

Після відправки задачі виконавцеві ми спочатку перевіряємо, чи завершено її виконання, за допомогою методу isDone (). Оскільки задача має затримку в одну секунду, до того як повернути число, я впевнений, що вона ще не завершена.

Виклик методу get () блокує потік і чекає завершення задачі, а потім повертає результат її виконання. Тепер future.isDone () поверне true, і ми побачимо на консолі наступне:

future done? falsefuture done? trueresult: 123

Задачі жорстко пов’язані із сервісом виконавців, і, якщо Ви його зупините, спроба отримати результат задачі виведе виняток:

executor.shutdownNow ();future.get ();

Ви, можливо, помітили, що цього разу ми створюємо сервіс трохи по-іншому: за допомогою методу newFixedThreadPool (1), який поверне виконавця з пулом в один потік. Це еквівалентно виклику методу newSingleThreadExecutor (), проте ми можемо змінити кількість потоків у пулі.

Тайм-аут

Будь-який виклик методу future.get () блокує потік до тих пір, поки задача не буде завершена. У найгіршому випадку виконання задачі не завершиться ніколи, блокуючи Ваший додаток. Уникнути цього можна, передавши тайм-аут:

ExecutorService executor = Executors.newFixedThreadPool (1);Future future = executor.submit (() -> { try { TimeUnit.SECONDS.sleep (2); return 123; } catch (InterruptedException e){ throw new IllegalStateException ("task interrupted", e);
}});future.get (1, TimeUnit.SECONDS);

Виконання цього коду викличе TimeoutException:

Exception in thread " main" java.util.concurrent.TimeoutException at java.util.concurrent.FutureTask.get (FutureTask.java:205)

Ви вже, можливо, здогадалися, чому було виведено цей виняток: ми вказали максимальний час очікування виконання задачі – секунду, тоді як її виконання триває дві секунди.

InvokeAll

Виконавці можуть приймати список задач на виконання за допомогою методу invokeAll (), який приймає колекцію callable-задач і повертає список з Future.

ExecutorService executor = Executors.newWorkStealingPool ();List<Callable> callables = Arrays.asList(

Інший спосіб віддати на виконання декілька задач – метод invokeAny (). Він працює трохи інакше: замість повернення Future він блокує потік до того, як завершиться хоча б одна задача, і повертає її результат.

Щоб показати, як працює цей метод, створимо метод, який емулює поведінку різних задач. Він повертатиме Callable, який поверне вказаний рядок після необхідної затримки:

Callable callable (String result, long sleepSeconds) { return () -> { TimeUnit.SECONDS.sleep (sleepSeconds); return result; };}

Використаємо цей метод, щоб створити декілька задач із різними рядками і затримками від однієї до трьох секунд. Відправка цих задач виконавцеві через метод invokeAny () поверне результат задачі з найменшою затримкою. В даному випадку це “task2”:

ExecutorService executor = Executors.newWorkStealingPool ();List<Callable> callables = Arrays.asList ( callable ("task1", 2), callable ("task2", 1), callable ("task3", 3));String result = executor.invokeAny (callables);System.out.println (result);// => task2

У прикладі, наведеному вище, використаний ще один вид виконавців, який створюється за допомогою методу newWorkStealingPool (). Цей метод з’явився в Java 8 і поводиться не так, як інші: замість використання фіксованої кількості потоків він створює ForkJoinPool з певним паралелізмом (parallelism size), що за умовчанням дорівнює кількості ядер машини.

ForkJoinPool уперше з’явився в Java 7, і ми розглянемо його детальніше в наступних частинах нашого керівництва. Тепер давайте подивимося на виконавців з планувальником (scheduled executors).

Виконавці з планувальником

Ми вже знаємо, як віддати завдання виконавцеві й отримати результат. Для того щоб періодично запускати задачу, ми можемо використати пул потоків з планувальником.

ScheduledExecutorService здатний запускати задачу один або кілька разів із заданим інтервалом.

Цей приклад показує, як змусити виконавця виконати задачу протягом трьох секунд:

ScheduledExecutorService executor = Executors.newScheduledThreadPool (1);Runnable task = () -> System.out.println ("Scheduling: " + System.nanoTime());ScheduledFuture<?> future = executor.schedule (task, 3, TimeUnit.SECONDS);TimeUnit.MILLISECONDS.sleep (1337);long remainingDelay = future.getDelay (TimeUnit.MILLISECONDS);System.out.printf ("Remaining Delay: %sms", remainingDelay);

Коли ми передаємо задачу планувальникові, він повертає особливий тип FutureScheduledFuture, який надає метод getDelay () для отримання часу, що залишився до запуску.

У виконавця з планувальником є два методи для установки задач: scheduleAtFixedRate () і scheduleWithFixedDelay (). Перший установлює задачу з певним інтервалом, наприклад, в одну секунду:

ScheduledExecutorService executor = Executors.newScheduledThreadPool (1);Runnable task = () -> System.out.println ("Scheduling: " + System.nanoTime());int initialDelay = 0;int period = 1;executor.scheduleAtFixedRate (task, initialDelay, period, TimeUnit.SECONDS);

Крім того, він приймає початкову затримку, яка визначає час до першого запуску.

Зверніть увагу, що метод scheduleAtFixedRate () не бере до розрахунку час виконання задачі. Так, якщо Ви поставите задачу, яка виконується протягом двох секунд, з інтервалом в одну, пул потоків рано чи пізно переповниться.

У цьому випадку необхідно використати метод scheduleWithFixedDelay (). Він працює приблизно так само, як і попередній, але вказаний інтервал відлічуватиметься від часу завершення попередньої задачі.

ScheduledExecutorService executor = Executors.newScheduledThreadPool (1);Runnable task = () -> { try { TimeUnit.SECONDS.sleep (2); System.out.println ("Scheduling: " + System.nanoTime()); } catch (InterruptedException e) { System.err.println ("task interrupted"); }};executor.scheduleWithFixedDelay (task, 0, 1, TimeUnit.SECONDS);

У наведеному прикладі ми ставили задачу із затримкою в одну секунду між закінченням виконання задачі й початком наступної. Початкової затримки немає, і кожна задача виконується за дві секунди. Так, задачі запускатимуться на 0, 3, 6, 9 і т. д. секунді. Як бачите, метод scheduleWithFixedDelay () дуже корисний, якщо ми не можемо заздалегідь сказати, як довго виконуватиметься задача.

Це була перша частина серії статей про багатопотокове програмування. Настійно рекомендую розібрати наведені вище приклади самостійно. Всі вони доступні на GitHub. Можете сміливо форкати репозиторій і додавати його обране.

Сподіваюся, Вам сподобалася стаття. Якщо у Вас виникли запитання, Ви можете поставити їх у твітері.

Переклад статті “Java 8 Concurrency Tutorial: Threads and Executors

Київ, Харків, Одеса, Дніпро, Запоріжжя, Кривий Ріг, Вінниця, Херсон, Черкаси, Житомир, Хмельницький, Чернівці, Рівне, Івано-Франківськ, Кременчук, Тернопіль, Луцьк, Ужгород, Кам'янець-Подільський, Стрий - за статистикою саме з цих міст програмісти найбільше переїжджають працювати до Львова. А Ви розглядаєте relocate?


Залишити відповідь

Ваша e-mail адреса не оприлюднюватиметься. Обов’язкові поля позначені *