Принципи реактивного програмування з використанням бібліотеки ReactiveX для Python на прикладі простого RSS-агрегатора


Дізнайтесь більше про нові кар'єрні можливості в EchoUA. Цікаві проекти, ринкова оплата, гарний колектив. Надсилайте резюме та приєднуйтеся до нас.

Останнім часом реактивне програмування в цілому, а технологія ReactiveX зокрема, набуває все більшої популярності серед розробників. Одні вже активно використовують усі переваги цього підходу, а інші тільки “щось чули”. Зі свого боку я спробую допомогти Вам уявити, наскільки деякі концепції реактивного програмування здатні змінити погляд на звичні, здавалося б, речі.

Є два принципово різних способи організації великих систем: відповідно до об’єктів і станів, які живуть у системі, і відповідно до потоків даних, які проходять через неї. Парадигма реактивного програмування припускає легкість вираження потоків даних, а також поширення змін завдяки цим потокам. Наприклад, в імперативному програмуванні операція привласнення означає кінцевість результату, тоді як в реактивному значення буде перераховано при отриманні нових вхідних даних. Потік значень проходить у системі ряд трансформацій, які потрібні для виконання певної задачі. Оперування потоками дозволяє системі бути розширюваною і асинхронною, а правильна реакція на виниклі помилки – відмовостійкою.

ReactiveX – бібліотека, що дозволяє створювати асинхронні й подієво-орієнтовані програми, що використовують спостережувані послідовності. Вона розширює шаблон Спостерігача для підтримки послідовностей даних, додає оператори для їх декларативного з’єднання, позбавляючи від необхідності піклуватися про синхронізацію і безпеку потоків, розділених структурах даних і неблокуючого I/O.

Однією з основних відмінностей бібліотеки ReactiveX від функціонального реактивного програмування є те, що вона оперує не такими, що безперервно змінюються, а дискретними значеннями, які “випускаються” протягом тривалого часу.

Варто трохи розповісти про те, що таке Observer, Observable, Subject. Модель Observable є джерелом даних і дозволяє обробляти потоки асинхронних подій подібно того, як Ви використовуєте для колекцій дані, наприклад, масиви. Все це замість колбеків, тобто код є більше читаним і менше схильним до помилок.

У ReactiveX спостерігач (Observer) підписується на Observable і згодом реагує на елемент або послідовність елементів, які той відправляє. У кожного Observer, підписаного на Observable, викликається метод Observer.on_next () на кожен елемент потоку даних, після якого може бути викликаний як Observer.on_complete (), так і Observer.on_error (). Часто Observable застосовується таким чином, що він не починає віддавати дані до тих пір, поки хто-небудь не підписується на нього. Це так звані “ледачі обчислення” – значення обчислюються тільки тоді, коли в них виникає потреба.

Бувають задачі, для виконання яких треба з’єднати Observer і Observable, щоб приймати повідомлення про події і повідомляти про них своїм підписникам. Для цього є Subject, що має, крім стандартної, ще декілька реалізацій:

  • ReplaySubject має можливість кеширувати всі дані, що надійшли до нього, а за появи нового підписника – віддавати всю послідовність спочатку, працюючи далі в звичайному режимі.
  • BehaviorSubject зберігає останнє значення, за аналогією з ReplaySubject віддаючи його підписникові, який з’явився. При створенні він набуває значення за умовчанням, яке отримуватиме кожен новий підписник, якщо останнього значення ще не було.
  • AsyncSubject також зберігає останнє значення, але не віддає дані, поки не завершиться вся послідовність.

Observable і Observer – тільки початок ReactiveX. Вони не несуть у потужності, яку являють собою оператори, що дозволяють трансформувати, об’єднувати, маніпулювати послідовностями елементів, які віддають Observable.

У документації ReactiveX опис операторів включає використання Marble Diagram. Наприклад, ось як ці діаграми представляють Observable та їх трансформації:

Дивлячись на діаграму, подану нижче, легко зрозуміти, що оператор map трансформує елементи, що віддаються Observable, шляхом застосування функції до кожного з них:

Яскравою ілюстрацією можливостей ReactiveX є додаток RSS-агрегатора. Тут постає необхідність асинхронного завантаження даних, фільтрації і трансформації значень, підтримки актуального стану шляхом періодичного оновлення.

У цій статті приклади для представлення основних принципів ReactiveX написані з використанням бібліотеки rx для мови програмування Python. Ось так, наприклад, виглядає абстрактна реалізація спостерігача:

class Observer (metaclass=ABCMeta): @abstractmethod def on_next (self, value): return NotImplemented @abstractmethod def on_error (self, error): return NotImplemented @abstractmethod def on_completed (self) :
return NotImplemented

Наший додаток у режимі реального часу обмінюватиметься сполученнями з браузером за допомогою веб-сокетів. Можливість легко реалізувати це надає Tornado.

Робота програми розпочинається із запуску сервера. При зверненні браузера до сервера відкривається веб-сокет.

import jsonimport osimport feedparserfrom rx import config, Observablefrom rx.subjects import Subjectfrom tornado.escape import json_decodefrom tornado.httpclient import AsyncHTTPClientfrom tornado.platform.asyncio import AsyncIOMainLoopfrom tornado.web import Application, RequestHandler, StaticFileHandler, urlfrom tornado.websocket import WebSocketHandlerasyncio = config['asyncio']
class WSHandler (WebSocketHandler): urls =['https://lenta.ru/rss/top7', 'http://wsrss.bbc.co.uk/russian/index.xml'] def open (self): print ("WebSocket opened")  # тут буде основна логіка нашого додатка def on_message (self, message): obj = json_decode (message)  # Відправляє повідомлення, яке отримує user_input self.subject.on_next (obj['term']) def on_close (self): # Відписатися від Observable; по ланцюжку зупинить роботу усіх observable self.combine_latest_sbs.dispose () print ("WebSocket closed") class MainHandler (RequestHandler): def get (self): self.render ("index.html") def main (): AsyncIOMainLoop ().install () port = os.environ.get ("PORT", 8080) app = Application ([ url (r"/", MainHandler)(r'/ws', WSHandler)(r'/static/#00', StaticFileHandler, {'path': "".}) ])
print ("Starting server at port: %s" % port) app.listen (port) asyncio.get_event_loop ().run_forever ()

Для обробки введеного користувачем запиту створюється Subject, при підписці на який він відправляє значення за умовчанням (у нашому випадку – порожній рядок), а потім раз на секунду відправляє те, що введено користувачем і задовольняє умовам: довжина 0 або більше 2, значення змінилося.

 # Subject одночасно і Observable, і Observer self.subject = Subject () user_input = self.subject.throttle_last ( 1000 # На заданому тимчасовому проміжку набувати останнього значення ).start_with ( '' # Відразу ж після підписки відправляє значення за умовчанням ).filter ( lambda text: len (text) == 0 or len (text) > 2
).distinct_until_changed ()  # Тільки якщо значення змінилося

Також для періодичного оновлення новин передбачений Observable, який раз на 60 секунд віддає значення.

 interval_obs = Observable.interval ( 60000 # Віддає значення разів в 60с (для періодичного оновлення) ).start_with (0)

Два ці потоки з’єднуються оператором combine_latest, у ланцюжок вбудовується Observable для отримання списку новин. Після чого на цей Observable створюється підписка, весь ланцюжок починає працювати тільки у цей момент.

 # combine_latest збирає 2 потоки із запитів користувача і тимчасових # інтервалів, спрацьовує на будь-яке повідомлення з кожного потоку
self.combine_latest_sbs = user_input.combine_latest ( interval_obs, lambda input_val, i: input_val ).do_action ( # Спрацьовує на кожен випущений елемент # Відправляє повідомлення для очищення списку на фронтенд lambda x: send_response ('clear') ).flat_map ( # У ланцюжок вбудовується Observable для отримання списку self.get_data ).subscribe (send_response, on_error)  # Створюється підписка; увесь ланцюжок починає працювати тільки в цей момен

Слід детальніше зупинитися на тому, що таке “Observable для отримання списку новин”. Зі списку url для отримання новин ми створюємо потік даних, елементи якого приходять у функцію, де за допомогою HTTP- клієнта Tornado AsyncHTTPClient відбувається асинхронне завантаження даних для кожного елементу списку urls. З них також створюється потік даних, який фільтрується за запитом, введеним користувачем. З кожного потоку ми беремо по 5 новин, які приводимо до потрібного формату для відправки на фронтенд.

 def get_rss (self, rss_url): http client = AsyncHTTPClient () return http client.fetch#01 def get_data (self, query): # Observable створюється зі списку url return Observable.from_list ( self.urls ).flat_map(
# Для кожного url створюється Observable, який завантажує дані lambda url: Observable.from_future (self.get_rss (url))  ).flat_map ( # Отримані дані парсятся, з них створюється Observable lambda x: Observable.from_list ( feedparser.parse (x.body)['entries'] ).filter ( # Фільтрує по входженню запиту в заголовок або текст новини lambda val, i: query in val.title or query in val.summary ).take (5)  # Беремо тільки по 5 новин по кожному url ).map (lambda x: {'title': x.title, 'link': x.link, 'published': x.published, 'summary': x.summary})  # Перетворить дані для відправки на фронтенд

Після того, як потік вихідних даних сформований, його передплатник починає поелементно отримувати дані. Функція send_response відправляє отримані значення у фронтенд, який додає новину в список.

 def send_response (x): self.write_message (json.dumps (x)) def on_error (ex): print (ex)

У файлі feeder.js

ws.onmessage = function (msg) { var value = JSON.parse (msg.data); if (value === " clear") {$results.empty (); return;} // Append the results $ ('

‘ ).appendTo ($results); $results.show ();}

Таким чином, реалізується push-технологія, в якій дані надходять від сервера до фронтенду, який лише відправляє введений користувачем запит для пошуку по новинах.

Підсумовуючи викладене, пропоную замислитися над тим, яка реалізація вийшла б при звичному підході з використанням колбеків замість Observable, без можливості легко об’єднати потоки даних, без можливості миттєвої відправки даних користувачеві-фронтенду і з необхідністю відстежувати зміни в рядку запиту. Серед Python-розробників технологія поки що практично не поширена, проте я бачу вже декілька можливостей її застосувати на поточних проектах.

Приклад використання ReactiveX для Python Ви можете знайти в GitHub репозиторії з демо-проектом RSS- агрегатора.


Висловлюємо вдячність Ксенії, Python-розробникові компанії Noveo, за детальний опис переваг реактивного програмування!

Київ, Харків, Одеса, Дніпро, Запоріжжя, Кривий Ріг, Вінниця, Херсон, Черкаси, Житомир, Хмельницький, Чернівці, Рівне, Івано-Франківськ, Кременчук, Тернопіль, Луцьк, Ужгород, Кам'янець-Подільський, Стрий - за статистикою саме з цих міст програмісти найбільше переїжджають працювати до Львова. А Ви розглядаєте relocate?


Залишити відповідь

Ваша e-mail адреса не оприлюднюватиметься. Обов’язкові поля позначені *