
Дізнайтесь більше про нові кар'єрні можливості в EchoUA. Цікаві проекти, ринкова оплата, гарний колектив. Надсилайте резюме та приєднуйтеся до нас.
Останнім часом реактивне програмування в цілому, а технологія ReactiveX зокрема, набуває все більшої популярності серед розробників. Одні вже активно використовують усі переваги цього підходу, а інші тільки “щось чули”. Зі свого боку я спробую допомогти Вам уявити, наскільки деякі концепції реактивного програмування здатні змінити погляд на звичні, здавалося б, речі.
Є два принципово різних способи організації великих систем: відповідно до об’єктів і станів, які живуть у системі, і відповідно до потоків даних, які проходять через неї. Парадигма реактивного програмування припускає легкість вираження потоків даних, а також поширення змін завдяки цим потокам. Наприклад, в імперативному програмуванні операція привласнення означає кінцевість результату, тоді як в реактивному значення буде перераховано при отриманні нових вхідних даних. Потік значень проходить у системі ряд трансформацій, які потрібні для виконання певної задачі. Оперування потоками дозволяє системі бути розширюваною і асинхронною, а правильна реакція на виниклі помилки – відмовостійкою.
ReactiveX – бібліотека, що дозволяє створювати асинхронні й подієво-орієнтовані програми, що використовують спостережувані послідовності. Вона розширює шаблон Спостерігача для підтримки послідовностей даних, додає оператори для їх декларативного з’єднання, позбавляючи від необхідності піклуватися про синхронізацію і безпеку потоків, розділених структурах даних і неблокуючого I/O.
Однією з основних відмінностей бібліотеки ReactiveX від функціонального реактивного програмування є те, що вона оперує не такими, що безперервно змінюються, а дискретними значеннями, які “випускаються” протягом тривалого часу.
Варто трохи розповісти про те, що таке Observer, Observable, Subject. Модель Observable є джерелом даних і дозволяє обробляти потоки асинхронних подій подібно того, як Ви використовуєте для колекцій дані, наприклад, масиви. Все це замість колбеків, тобто код є більше читаним і менше схильним до помилок.
У ReactiveX спостерігач (Observer) підписується на Observable і згодом реагує на елемент або послідовність елементів, які той відправляє. У кожного Observer, підписаного на Observable, викликається метод Observer.on_next ()
на кожен елемент потоку даних, після якого може бути викликаний як Observer.on_complete ()
, так і Observer.on_error ()
. Часто Observable застосовується таким чином, що він не починає віддавати дані до тих пір, поки хто-небудь не підписується на нього. Це так звані “ледачі обчислення” – значення обчислюються тільки тоді, коли в них виникає потреба.
Бувають задачі, для виконання яких треба з’єднати Observer і Observable, щоб приймати повідомлення про події і повідомляти про них своїм підписникам. Для цього є Subject, що має, крім стандартної, ще декілька реалізацій:
- ReplaySubject має можливість кеширувати всі дані, що надійшли до нього, а за появи нового підписника – віддавати всю послідовність спочатку, працюючи далі в звичайному режимі.
- BehaviorSubject зберігає останнє значення, за аналогією з ReplaySubject віддаючи його підписникові, який з’явився. При створенні він набуває значення за умовчанням, яке отримуватиме кожен новий підписник, якщо останнього значення ще не було.
- AsyncSubject також зберігає останнє значення, але не віддає дані, поки не завершиться вся послідовність.
Observable і Observer – тільки початок ReactiveX. Вони не несуть у потужності, яку являють собою оператори, що дозволяють трансформувати, об’єднувати, маніпулювати послідовностями елементів, які віддають Observable.
У документації ReactiveX опис операторів включає використання Marble Diagram. Наприклад, ось як ці діаграми представляють Observable та їх трансформації:
Дивлячись на діаграму, подану нижче, легко зрозуміти, що оператор map трансформує елементи, що віддаються Observable, шляхом застосування функції до кожного з них:
Яскравою ілюстрацією можливостей ReactiveX є додаток RSS-агрегатора. Тут постає необхідність асинхронного завантаження даних, фільтрації і трансформації значень, підтримки актуального стану шляхом періодичного оновлення.
У цій статті приклади для представлення основних принципів ReactiveX написані з використанням бібліотеки rx для мови програмування Python. Ось так, наприклад, виглядає абстрактна реалізація спостерігача:
class Observer (metaclass=ABCMeta): @abstractmethod def on_next (self, value): return NotImplemented @abstractmethod def on_error (self, error): return NotImplemented @abstractmethod def on_completed (self) :
return NotImplemented
Наший додаток у режимі реального часу обмінюватиметься сполученнями з браузером за допомогою веб-сокетів. Можливість легко реалізувати це надає Tornado.
Робота програми розпочинається із запуску сервера. При зверненні браузера до сервера відкривається веб-сокет.
import jsonimport osimport feedparserfrom rx import config, Observablefrom rx.subjects import Subjectfrom tornado.escape import json_decodefrom tornado.httpclient import AsyncHTTPClientfrom tornado.platform.asyncio import AsyncIOMainLoopfrom tornado.web import Application, RequestHandler, StaticFileHandler, urlfrom tornado.websocket import WebSocketHandlerasyncio = config['asyncio']
class WSHandler (WebSocketHandler): urls =['https://lenta.ru/rss/top7', 'http://wsrss.bbc.co.uk/russian/index.xml'] def open (self): print ("WebSocket opened") # тут буде основна логіка нашого додатка def on_message (self, message): obj = json_decode (message) # Відправляє повідомлення, яке отримує user_input self.subject.on_next (obj['term']) def on_close (self): # Відписатися від Observable; по ланцюжку зупинить роботу усіх observable self.combine_latest_sbs.dispose () print ("WebSocket closed") class MainHandler (RequestHandler): def get (self): self.render ("index.html") def main (): AsyncIOMainLoop ().install () port = os.environ.get ("PORT", 8080) app = Application ([ url (r"/", MainHandler)(r'/ws', WSHandler)(r'/static/#00', StaticFileHandler, {'path': "".}) ])
print ("Starting server at port: %s" % port) app.listen (port) asyncio.get_event_loop ().run_forever ()
Для обробки введеного користувачем запиту створюється Subject, при підписці на який він відправляє значення за умовчанням (у нашому випадку – порожній рядок), а потім раз на секунду відправляє те, що введено користувачем і задовольняє умовам: довжина 0 або більше 2, значення змінилося.
# Subject одночасно і Observable, і Observer self.subject = Subject () user_input = self.subject.throttle_last ( 1000 # На заданому тимчасовому проміжку набувати останнього значення ).start_with ( '' # Відразу ж після підписки відправляє значення за умовчанням ).filter ( lambda text: len (text) == 0 or len (text) > 2
).distinct_until_changed () # Тільки якщо значення змінилося
Також для періодичного оновлення новин передбачений Observable, який раз на 60 секунд віддає значення.
interval_obs = Observable.interval ( 60000 # Віддає значення разів в 60с (для періодичного оновлення) ).start_with (0)
Два ці потоки з’єднуються оператором combine_latest,
у ланцюжок вбудовується Observable для отримання списку новин. Після чого на цей Observable створюється підписка, весь ланцюжок починає працювати тільки у цей момент.
# combine_latest збирає 2 потоки із запитів користувача і тимчасових # інтервалів, спрацьовує на будь-яке повідомлення з кожного потоку
self.combine_latest_sbs = user_input.combine_latest ( interval_obs, lambda input_val, i: input_val ).do_action ( # Спрацьовує на кожен випущений елемент # Відправляє повідомлення для очищення списку на фронтенд lambda x: send_response ('clear') ).flat_map ( # У ланцюжок вбудовується Observable для отримання списку self.get_data ).subscribe (send_response, on_error) # Створюється підписка; увесь ланцюжок починає працювати тільки в цей момен
Слід детальніше зупинитися на тому, що таке “Observable для отримання списку новин”. Зі списку url для отримання новин ми створюємо потік даних, елементи якого приходять у функцію, де за допомогою HTTP- клієнта Tornado AsyncHTTPClient відбувається асинхронне завантаження даних для кожного елементу списку urls. З них також створюється потік даних, який фільтрується за запитом, введеним користувачем. З кожного потоку ми беремо по 5 новин, які приводимо до потрібного формату для відправки на фронтенд.
def get_rss (self, rss_url): http client = AsyncHTTPClient () return http client.fetch#01 def get_data (self, query): # Observable створюється зі списку url return Observable.from_list ( self.urls ).flat_map(
# Для кожного url створюється Observable, який завантажує дані lambda url: Observable.from_future (self.get_rss (url)) ).flat_map ( # Отримані дані парсятся, з них створюється Observable lambda x: Observable.from_list ( feedparser.parse (x.body)['entries'] ).filter ( # Фільтрує по входженню запиту в заголовок або текст новини lambda val, i: query in val.title or query in val.summary ).take (5) # Беремо тільки по 5 новин по кожному url ).map (lambda x: {'title': x.title, 'link': x.link, 'published': x.published, 'summary': x.summary}) # Перетворить дані для відправки на фронтенд
Після того, як потік вихідних даних сформований, його передплатник починає поелементно отримувати дані. Функція send_response
відправляє отримані значення у фронтенд, який додає новину в список.
def send_response (x): self.write_message (json.dumps (x)) def on_error (ex): print (ex)
У файлі feeder.js
ws.onmessage = function (msg) { var value = JSON.parse (msg.data); if (value === " clear") {$results.empty (); return;} // Append the results $ ('
- ‘ + value.title +’‘ + value.published + ” + value.summary + ‘
‘ ).appendTo ($results); $results.show ();}
Таким чином, реалізується push-технологія, в якій дані надходять від сервера до фронтенду, який лише відправляє введений користувачем запит для пошуку по новинах.
Підсумовуючи викладене, пропоную замислитися над тим, яка реалізація вийшла б при звичному підході з використанням колбеків замість Observable, без можливості легко об’єднати потоки даних, без можливості миттєвої відправки даних користувачеві-фронтенду і з необхідністю відстежувати зміни в рядку запиту. Серед Python-розробників технологія поки що практично не поширена, проте я бачу вже декілька можливостей її застосувати на поточних проектах.
Приклад використання ReactiveX для Python Ви можете знайти в GitHub репозиторії з демо-проектом RSS- агрегатора.
Висловлюємо вдячність Ксенії, Python-розробникові компанії Noveo, за детальний опис переваг реактивного програмування!
Київ, Харків, Одеса, Дніпро, Запоріжжя, Кривий Ріг, Вінниця, Херсон, Черкаси, Житомир, Хмельницький, Чернівці, Рівне, Івано-Франківськ, Кременчук, Тернопіль, Луцьк, Ужгород, Кам'янець-Подільський, Стрий - за статистикою саме з цих міст програмісти найбільше переїжджають працювати до Львова. А Ви розглядаєте relocate?