Апишки описываются в виде классов, например
class Categories(JsonEndpoint): url = "http://127.0.0.1:8888/categories" params = {"page": range(100), "language": "en"} headers = {"User-Agent": get_user_agent} results_key = "*.slug" categories = Categories() class Posts(JsonEndpoint): url = "http://127.0.0.1:8888/categories/{category}/posts" params = {"page": range(100), "language": "en"} url_params = {"category": categories.iter_results()} results_key = "posts" async def comments(self, post): comments = Comments( self.session, url_params={"category": post.url.params["category"], "id": post["id"]}, ) return [comment async for comment in comments] posts = Posts()
В params и url_params могут быть функции(как здесь get_user_agent — возвращает случайный useragent), range, итераторы, awaitable и асинхронные итераторы(таким образом можно увязать их между собой).
В параметрах headers и cookies тоже могут быть функции и awaitable.
Апи категорий в примере выше возвращает массив объектов, у которых есть slug, итератор будет возвроащать именно их. Подсунув этот итератор в url_params постов, итератор пройдется рекурсивно по всем категориям и по всем страницам в каждой. Он прервется когда наткнется на 404 или какую-то другую ошибку и перейдет к следующей категории.
А репозитории есть пример aiohttp сервера для этих классов чтобы всё можно было протестировать.
Помимо get параметров можно передавать их как data или json и задать другой method.
results_key разбивается по точке и будет пытаться выдергивать ключи из результатов. Например «comments.*.text» вернет текст каждого комментария из массива внутри comments.
Результаты оборачиваются во wrapper у которого есть свойства url и params. url это производное строки, у которой тоже есть params. Таким образом можно узнать какие параметры использовались для получения данного результата Это демонстрируется в методе comments.
Также там есть базовый класс Sink для обработки результатов. Например, складывания их в mq или базу данных. Он работает в отдельных тасках и получает данные через asyncio.Queue.
class LoggingSink(Sink): def transform(self, obj): return repr(obj) async def init(self): from loguru import logger self.logger = logger async def process(self, obj): self.logger.info(obj) return True sink = LoggingSink(num_tasks=1)
Пример простейшего Sink. Метод transform позволяет провести какие-то манипуляции с объектом и вернуть None, если он нам не подходит. т.е. в тем также можно сделать валидацию.
Sink это асинхронный contextmanager, который при выходе по-идее будет ждать пока все объекты в очереди будут обработаны, потом отменит свои таски.
Ну и, наконец, для связки этого всего вместе я сделал класс Worker. Он принимает один endpoint и несколько sink`ов. Например,
worker = Worker(endpoint=posts, sinks=[loggingsink, mongosink]) worker.run()
run запустит asyncio.run_until_complete для pipeline`а worker`а. У него также есть метод transform.
Ещё есть класс WorkerGroup который позволяет создать сразу несколько воркеров и сделать asyncio.gather для них.
В коде есть пример сервера, который генерит данные через faker и обработчиков для его endpoint`ов. Думаю это нагляднее всего.
Всё это на ранней стадии развития и я пока что часто менял api. Но сейчас вроде пришел к тому как это должно выглядеть. Буду раз merge request`ам и комментариям к моему коду.
ссылка на оригинал статьи https://habr.com/ru/post/525912/
Добавить комментарий