Привет, Хабр! Меня зовут Артемий Кравцов, я работаю инженером в Wildberries.
Про ClickHouse как MPP-систему (Massively Parallel Processing) не так много информации можно найти в открытом доступе, но многое можно рассказать. Я хочу поделиться своим опытом и надеюсь, что мой рассказ поможет кому-нибудь сформировать целостную картину о том, как в ClickHouse спроектированы распределённые вычисления.
-
Если вам проще воспринимать на слух, предлагаю послушать выступление с этой темой на бигдатнике от WB: <ссылка на YouTube>
-
Возможно, вам будет полезна предыдущая статья про то, как организовано хранение данных в ClickHouse: <ссылка на Хабр>
План статьи
-
Объявление кластера
-
Движок Distributed
-
Вставки на кластер
-
VALUES
-
INSERT SELECT
-
MATERIALIZED VIEW
-
-
Распределённые чтения
-
Агрегатные функции
-
GROUP BY
-
JOIN
-
WITH
-
Кластер в ClickHouse — это нескольких инстансов ClickHouse, логически объединённых вместе. Чтобы тот или иной инстанс знал про существование кластера, в его конфиге должна быть секция remote_servers с описанием схемы кластера. По этой схеме инстансы понимают про себя, входят ли они в какой-либо кластер, и узнают о своих «соседях» по кластеру. На всех нодах, которые планируется объединять, объявление кластера в конфиге обычно делают одинаковым.
Создаём кластер, который называется my_clh_cluster. Внутри кластера пять шардов, в каждый шард вкладываем по одному инстансу в качестве единственной реплики. Для каждого инстанса указываем домен и порт, по которым до него можно достучаться.
То, что инстансы знают друг о друге через объявление кластера, не означает, что они как-то взаимодействуют. В объявлении можно написать любую ерунду — нафантазируем, например, что в кластере есть Клик на каком-то хосте, которого в действительности не существует:
И если мы так напишем, то Клики нормально поднимутся и будут работать. Мы даже увидим в составе кластера несуществующий хостнейм:
Ошибка всплывёт, когда мы захотим что-то сделать на кластере. Тогда Клик достанет из контекста, в котором запущен, схему кластера, и окажется, что в этой схеме один из адресов ведёт не в другой Клик, а в никуда:
Пробуем создать юзера запросом CREATE USER ON CLUSTER. Клики на четырёх настоящих хостах создали юзера и окнули в ответ на запрос, а вот последний Клик не вышел на связь с Keeper-ом и спустя какое-то время запрос отвалился по таймауту.
Единственная синхронизация между машинами кластера тут оказалась на уровне Кипера, и заключается она лишь в том, что когда последняя нода станет доступна, запросы с синтаксисом ON CLUSTER на ней выполнятся в том порядке, в каком пришли. На остальных четырёх нодах в это же время всё в порядке. Они никак не отреагируют на ошибку, которую получит пользователь — роллбека команды на создание юзера на них не произойдёт. Ведут себя эгоистично, хотя работают в составе кластера!
Можно иметь кластер, но им не пользоваться — инстансы Клика самодостаточны. Им не нужен кластер. У вас вполне может быть несколько отдельных Кликхаусов, каждый со своими данными, своей конфигурацией и со своими пользователями. Даже разных версий. Каждый будет жить своей жизнью. И пользоваться вы ими будете как single-node’ами. Внутри одного кластера они будут объявлены только лишь для удобства администрирования: время от времени создавать, например, пользователя на всех нодах через CREATE USER ON CLUSTER.
Помимо администрирования, кластер может быть нужен для репликации.
Создам реплицированную таблицу. В ней будет одна числовая колонка, по которой таблица будет отсортирована.
Репликация работает не за счёт того, что мы дописали ON CLUSTER: можно было и руками сходить на машины по отдельности — ничего бы не изменилось. И дело не в том, что машины объявлены репликами внутри одного шарда в xml-конфиге, ведь это не так — сейчас ноды в разных шардах, и внутри своих шардов они — в единственном экземпляре. Тем не менее, репликация будет работать.
Репликация работает, если на всех машинах, содержащих реплики таблицы, первый аргумент движка Replicated после раскрытия макросов совпадает, а второй аргумент не совпадает, и используется общий Keeper.
Вставим в реплицированную таблицу 500 чисел, от 0 до 499. Я хочу продемонстрировать — то, что в схеме кластера объявлено пять шардов, ни к чему не обязывает. Мы не обязаны шардировать данные просто потому, что объявили шардированный кластер.
То, что мы объявляем пять шардов, говорит о намерении делить таблицы на пять непересекающихся частей (шардов) и хранить их на разных физических носителях. В нашем случае — на разных машинах. Суммарного места на дисках потребуется приблизительно столько же, зато мощностей для выполнения запросов будет больше.
Когда будет нужно выполнять расчёты на основе данных из шардированной таблицы, на каждом узле будет прочитано столько строк, сколько там хранится. Координировать вычисления будет та машина, с которой был инициирован распределённый запрос. В ClickHouse в роли инициатора может выступать любая из машин, обладающая схемой кластера. Эта же машина-инициатор будет брать на себя доведение вычислений до конца (стадию reduce в модели MapReduce).
Если запускаем count, тогда по всей таблице должна получиться цифра 500, так как мы вставили в таблицу пятьсот строк. Но ведь таблица реплицирована, а значит, данные на шардах пересекаются на 100%. Как Клик справится с этой ситуацией? Посмотрим!
Движок Distributed
Представляю движок таблиц Distributed! Это непосредственно то, что координирует распределённое выполнение пользовательских запросов. Для распределённых вычислений нужно создать специальную таблицу и работать с ней, а не с локальными. Такую таблицу можно создать на любой машине кластера или на всех, или даже вне кластера (лишь бы в конфиге был объявлен состав того кластера, на который ссылается Distributed).
Если говорить про таблицу с картинки сверху, то в момент обращения к Distributed на каждом шарде кластера my_clh_cluster (первый аргумент движка) должна существовать таблица test.sharding (второй и третий аргументы движка). Именно эти таблицы в итоге будут прочитаны и использованы для ответа на запросы к Distributed. Четвёртый аргумент движка — это ключ шардирования (к нему вернёмся позже).
Хочу, чтобы вас не смущало то, что Distrubuted называется таблицей. Движки таблиц в ClickHouse позволяют подходить к решению множества задач, и задачи далеко не всегда связаны с хранением данных в табличном виде. Например, есть движки для интеграций — ODBC, Kafka, S3. Таблицы на таких движках не хранят данные в табличном виде, хотя обращаться с ними приходится как с таблицами. Так же с Distributed. Создаётся Distributed как таблица, но никакие данные не хранит. Этим она больше похожа на View. Или, скажем, её можно называть интерфейсом. Интерфейсом поверх шардированного кластера, с помощью которого можно делать делать две вещи: инсерты на кластер и селекты из кластера.
Выполняя вставку в Distributed, Клик воспользуется ключом шардирования, чтобы распределить вставляемые данные между шардами своего кластера. Здесь начинает играть роль схема кластера: в зависимости от параметра internal_replication из xml-конфига, из каждого шарда будет выбрана либо одна реплика (для вставки), либо все имеющиеся. В наших шардах сейчас по одной реплике, так что нам разницы нет.
Получив запрос на чтение, Distributed разделит запрос на фрагменты и направит фрагменты выполняться на распределённые узлы (выберет по одному узлу из каждого шарда, согласно настройке load_balancing). Затем получит результаты, осуществит финализацию вычислений и отдаст ответ пользователю.
Теперь обратимся к созданной Distributed и сделаем count. Получится… 2500! Просим с детализацией до хоста:
На каждом хосте по 500 строк. Это некорректный результат с точки зрения данных, хотя он отражает реальное положение дел на кластере. Иногда мы не почувствуем по результату запроса, если что-то не так с шардированием: например, количество уникальных нумберов не изменится, даже если записать таблицу трижды. Среднее значение по столбцу тоже не изменится. А вот count и sum — изменятся. Проблемы с распределением данных между инстансами (как мы сейчас себе создали проблему, работая с репликами как с шардами!) тяжело обнаружить. Такие проблемы скрываются: явных ошибок нет, а результаты запросов вроде и близки к правде, но неверные.
Надеюсь, что на этом примере удалось показать, что Клик автоматически не обеспечивает объявленного в конфиге шардирования. Он не занимается кластером. В виде движка Distributed он предлагает интерфейс для распределённых вставок и вычислений согласно схеме вашего кластера. Этот интерфейс накладывается поверх самостоятельных, живущих своей жизнью Кликов, с которыми можно продолжать работать как с single-node’ами и после того, как у вас появятся таблицы Distributed. Если вы собираетесь делать MPP в Клике, именно вам предстоит отвечать за шардирование данных.
Вставки на кластер
Можно (и даже рекомендуется) вставлять данные напрямую в шарды — на стороне бекенда определять место хранения строчки, и направлять вставку сразу на нужный шард.
Но можно вставлять и в Distributed. Distributed будет определять место хранения строчки, рассчитывая для каждой значение ключа шардирования. Ключ шардирования — это четвёртый аргумент в движке. Ключ шардирования — выражение, значение которого можно посчитать для каждой строчки в таблице.
В той таблице, которую мы создали, ключ шардирования — просто значение из колонки. От него таблица Distributed возьмёт остаток от деления на суммарный вес всех шардов. По умолчанию вес каждого шарда равен единице, так что, с нашими пятью шардами, делить будем на пять. Остаток 0 будет говорить о том, что строчка ляжет на первый шард. Остаток 1 — что на второй, и так далее.
Ключ шардирования обязан отдавать положительное число. Если бы в таблице не было подходящего поля (или если бы значения в нём были плохо и неравномерно распределены), в качестве ключа шардирования можно было бы брать хэш от одного или нескольких колонок.
Давайте представим, что я пересоздал таблицу test.sharding на всех пяти хостах, убрав репликацию. Сейчас она пустая. Distributed-таблицы — такие же, ключ шардирования — колонка number. Я вставляю 500 строк. Посмотрим, как они распределились.
По сто строк на шард, в сумме пятьсот. На первом шарде оказались строчки, у которых ключ шардирования кратен количеству шардов: 5, 10, 15… Вот теперь похоже на настоящее шардирование!
Если бы это были не просто числа, а идентификаторы клиента, такое шардирование гарантировало бы, что все строки, относящиеся к клиенту с идентификатором, к примеру, 118452, будут лежать на третьем шарде и никаком другом. Мы имели бы локальность данных по клиенту. Из локальности данных можно извлекать выгоду во время SELECT-ов.
Теперь перельём данные INSERT SELECT’ом, постаравшись сохранить шардирование. Пусть второй таблицей будет test.same_sharding — такая же по схеме и шардированию, что и test.sharding.
Сначала пишу так, как любой человек напишет. Просто INSERT SELECT. Пользуюсь именами таблиц Distributed. Если я, работая на третьем хосте, напишу имена локальных таблиц, то перелью данные только на третьем хосте, а не на всех пяти.
Сначала выполняется селект. На шарды уходит запрос на селект, хост-инициатор дожидается результатов, выкачивает их себе и тут же вставляет в свою (локальную) Distributed таблицу.
Distributed для каждой строчки вычисляет ключ шардирования и направляет запросы на вставку на шарды. Данные снова распределяются по кластеру.
Вы, наверное, нет-нет да и задумаетесь — так ли нужно перешардировать заново, если можно просто повторить шардирование таблицы-источника?!
Есть способ выполнить параллельный инсерт (локально и без перешардирования). С сеттингом parallel_distributed_insert_select=2 вставка будет происходить по-другому. Инициатор, закидывая запрос на шарды, подготовит его таким образом, чтобы прямо там, на шардах, и произошла переливка из одной локальной таблицу в другую локальную. Дождётся ответа от каждого шарда и сообщит пользователю, что всё сделано.
Вариант без сеттинга — дефолтный вариант — такой, где инициатор берёт на себя дополнительную нагрузку, зато вы гарантированно получаете корректный результат. В таблице назначения данные будут расшардированы корректно независимо от того, как с этим обстояли дела в таблице-источнике.
Указывая сеттинг, мы берём на себя ответственность за результат. Если бы в таблицах-источниках данные остались полностью реплицированными — то в итоговых таблицах полная репликация повторилась бы.
ClickHouse при любом случае исходит из того, что полагаться на шардирование нельзя (ведь шардирование — зона ответственности пользователей), и лучше поработать подольше, чем отдать некорректный результат. Но вот будь у нас сейчас таблица-источник размером в терабайт, мы бы этот терабайт, не воспользовавшись сеттингом, передавали бы по сети туда-обратно вообще без необходимости.
Матвьюхи и перешардирование (плохой пример)
Теперь хочу рассказать поучительную историю из нашей жизни. Это будет история про матвьюхи (Materialized View, триггер на вставку) и перешардирование.
У нас, как и у многих, несколько слоёв в хранилище: слой сырых данных и слой подготовленных данных. А ещё у нас, как и у многих, есть необходимость хранить одну таблицу несколько раз с разными сортировками. После того, как мы начали работать с кластером, необходимость обострилась ещё больше, потому что шардирование, как и сортировку, для таблицы можно выбрать только один раз. Поэтому так получается, что одни и те же данные мы часто храним по нескольку раз: сначала в сыром слое, потом с одной сортировкой и шардированием, потом с другой, и перекладыванием данных из одного места в другое занимаются матвьюхи.
Примерная схемка на иллюстрации выше. Все начинается с Кафок — у нас по консьюмеру на каждом шарде. Из кафок матвьюхи вытаскивают данные и ежеминутно пушат в Distributed до 1 млн строк за раз.
Сразу отмечу, у нас отключены вставки через бекграунд.
Что это такое? Клик может вставки в Дистрибутед складывать на диск на инициаторе и отпускать пользователей с ok-ом. Отдельный процесс в фоновом режиме будет такие вставки подхватывать и распределять по шардам.
Мы так не захотели делать, потому что готовы ждать записи на каждый шард. У нас небольшой кластер в одном датацентре, поэтому мы не сильно замедлимся, если захотим дождаться вставки в каждый.
Кроме того, нам важно понимать, что если вставка прошла, значит, данные оказались именно там, куда их вставляли, и они уже доступны для пользователей. Наконец, неохота мониторить фоновые процессы по вставкам, потому что если уж они включены — их обязательно надо мониторить. Так что вариант со вставками в бекграунде есть, но мы решили, что он не для нас (хотя в ситуации, про которую я буду рассказывать, они могли бы отчасти помочь).
Итак, консьюмер на любом из шардов записывает прочитанные данные раз в минуту. Когда это происходит, матвьюха делает инсерт в Distributed, а Distributed тут же распределяет инсерты по кластеру и ждёт, пока они выполнятся.
На какой-нибудь машине кластера в этот момент происходит запись в одну из таблиц. Триггерится ещё одна матвьюшка, уже та, которая лежит на первом слое таблиц и хочет перетащить данные во второй. При вставке в следующую Distributed данные делятся по ключу шардированию ещё на пять кусочков.
Каждый шард вставляет в собственную дистрибутед. Если нарисовать весь кластер в работе, то это будет выглядеть так:
Один сброс данных из Кафки приводит к тому, что на каждую таблицу из второго слоя приходится пять вставок. Теперь представьте, что там впереди ещё один слой таблиц (и ещё одно перешардирование):
Пять вставок — это пять отдельных триггеров матвьюхи. Каждый из пяти кусочков с данными (а они уже довольно-то мелкие после двух перешардирований) будет разделён ещё на пять частей. Таблицы из третьего слоя примут уже по 25 вставок:
Если допустим, что в среднем вставка проходит в три партиции — тогда каждая из 75 одновременных вставок создаст в таблице три новых куска. Получается, что всего на один сброс данных из Кафки приходится по 75 новых кусков в каждой таблице из третьего слоя! Неожиданно!
У нас пять консьюмеров, а не один. В минуту будет не один сброс данных из Кафки, а пять. Значит, за минуту в таблице на третьем слое появится не 75, а почти 400 новых кусков. 400 кусков в минуту — это ужасно! Это как вставлять 400 раз за минуту, а ведь частые вставки — один из самых больших антипаттернов для ClickHouse!
Клик запретит новые вставки в таблицу, когда в ней накопится 3000 активных кусков. А искусственно притормаживать вставки начнёт после 1000 кусков, чтобы выиграть себе побольше времени на слияния.
В нашем сетапе, где вставки в Distributed синхронные, самая первая вставка ждёт, пока не отработает самая последняя матвьюха, так что в моменте все эти запросы висят одновременно, включая те, которые Клик притормаживает искусственно. В моменте это приводит к всплескам по количеству запросов. Если вдобавок реплицировать те таблицы, которые захлёбываются в новых кусках, можно всерьёз озадачить Keeper’а. Ему в очередь репликации посыпется вал мелких задач. Когда подобных реплицированных таблиц наберётся много, репликация начнёт запаздывать.
Так что последовательные цепочки матвьюх с перешардированием — это игры с огнём. И чем больше в кластере шардов, тем дороже они стоят. Если в кластере будет 20 машин, даже два последовательных перешардирования будет трудно себе позволить, ведь это будет выливаться в 400+ одновременных вставок.
Мы вынесли из этой ситуации такие уроки:
-
не следует перешардировать по цепочке. Лучше от одной таблицы на первом слое запустить сразу все матвьюхи. Пусть матвьюхи хватают свой батч с данными до перешардирования (пока он ещё один), а не после (когда он уже разделен на 5 частей). Это нетрудно реализовать — триггер на вставку может срабатывать и на вставки в Distributed.
-
незачем иметь по консьюмеру на каждом шарде, если хватает одного консьюмера. Чем меньше вставок — тем лучше.
Агрегатные функции
Можно селектить из дистрибутед таблицы так же, как вы привыкли селектить из обычных таблиц, не чувствовать никакой разницы и не интересоваться ничем из того, что там происходит за кулисами. За кулисами Клик будет переносить на шарды настолько много работы, насколько это возможно без допущений насчёт шардирования. Всё будет в порядке, просто не всегда запрос будет выполняться самым оптимальным образом из возможных.
План выполнения распределённого запроса в Клике в большинстве случаев укладывается в простую схему, которую можно проиллюстрировать на примере вычисления любой агрегатной функции.
Если вы выполняете запрос над распределённым источником, то читать с диска, как ни крути, нужно на нескольких удалённых узлах. Вычисление начнётся там же, где было чтение — на шардах.
Клики на шардах рассчитают промежуточный результат агрегатной функции и отдадут иницатору промежуточный результат (или, другими словами, стейт). Есть специальный тип данных, предназначенный для сериализации и хранения промежуточного результата вычисления агрегатных функций — AggregateFunction. Этот тип данных часто используют с движками AggregatingMergeTree и SummingMergeTree. Для каждой агрегатной функции за этим AggregateFunction скрывается что-то своё: для uniq промежуточное состояние — это хэш-таблица. Для count — одна цифра. У каждой функции свой стейт. Шарды передают на инициатора стейты, а инициатор их мёржит — финализирует 5 промежуточных вычислений. Это можно увидеть на плане запроса, который сейчас перед вами.
Если мы добавим WHERE, он выполнится на шардах. Колонка, по которой фильтруем, есть в локальных таблицах. Локальные таблицы лежат на пяти узлах, и там же сразу после чтения Клики отфильтруют строки по number-у. Это логично — нет никакого смысла тянуть с фильтрацией.
Но если на основе результатов предыдущего запроса (с count-ом или какой-угодно другой агрегатной функцией), тогда расчёты будут происходить целиком на инициаторе, потому что финализация перед этим происходила на инициаторе.
Иначе говоря, подзапрос с результатом count-а будет уже не распределённым источником, как Distributed таблица, а вполне себе локальным, и всё, что вы дальше планировали делать с этим подзапросом, не будет параллелиться. Можно утверждать, что если во FROM — подзапрос из распределённого источника, то результат подзапроса будет иметься только на инициаторе.
Может показаться неожиданным, но утверждение выше верно не только для таких подзапросов, где в SELECT — агрегатные функции, а вообще для любых подзапросов.
Обратите внимание на картинку сверху. На ней два одинаковых запроса, единственное отличие между которыми — секция FROM. С левой стороны во FROM-е — подзапрос, но в этом подзапросе нет агрегатных функций. Роль инициатора в таком подзапросе — объединить поток данных от распределённых узлов. Никакого мёржа он не осуществляет. И всё-таки мы имеем, что после FROM-а всё, что написано, выполняется только на одной машине, а не параллельно на всём кластере.
Это подтверждается именами хостов в выводе запросов. Чуть попозже я расскажу, как тут можно выкрутиться, а сейчас хочу поделиться несколькими способами отслеживать выполнение распределённого запроса (чтобы подобные ситуации вы могли у себя сами отлавливать).
Самое простое — расставить по интересующим вас местам в запросе функцию hostName или shardNum. Функция hostName возвращает имя хоста, на котором выполняется эта функция. Запрос справа выводит имена нескольких хостов, потому что селект выполняется на нескольких хостах параллельно. А когда фрагмент кода выполняется на инициаторе, видно только имя инициатора.
Чуть посложнее — посмотреть на план запроса. Подойдёт и EXPLAIN PLAN, и EXPLAIN PIPILINE. Обе команды отображают в своём плане этап, который называется Union. Union — это объединение данных, пришедших из распределённых узлов, практически как через UNION ALL. Всё, что по дереву ниже UNION-а, выполняется параллельно, что выше — на инициаторе.
Для EXPLAIN PLAN-а будет лучше, если инициатор сам является одним из шардов, потому что тогда в плане будет подробное объяснение этапов ниже UNION-а.
Мы можем, сравнивая два плана, увидеть, что стадия CreatingSets, то есть создание множества для правой части IN-а, происходит после UNION-а, когда во FROM подзапрос. И она же происходит до UNION-а, то есть на шардах, когда во FROM — распределённая таблица.
Для третьего способа дебага нужно разок выполнить запрос так, чтобы в выводе получить query_id первоначального запроса. Если у вас есть этот query_id, вам ничего не стоит по query_log-у на кластере проанализировать, кто чем занимался.
Напомню, инициатор разбивает пользовательский запрос на «фрагменты», более мелкие запросы, их переписывает и затем оркестрирует их выполнение. Лог выполнения этих запросов можно найти в system.query_log-е по общему для всех них initial_query_id.
Как-то так можно запросить данные с query_log-ов: запросить по всему кластеру, сгруппировать по хэшу от очищенного от литералов запроса и посмотреть, сколько уникальных хостов который фрагмент выполняли. Кстати, там же, в query_log-е, можно посмотреть сеттинги, с которыми выполнялись запросы.
Видно, что от левого запроса на шардах выполнялся только SELECT FROM. А от правого — SELECT FROM WHERE IN. Правый запрос лучше параллелизован. Кроме того, в первом случае IN не пробросился на шарды, и шарды потеряли возможность воспользоваться индексами при чтении из таблицы. Тогда как правый запрос подтягивает индексы на шардах, левый — ничего не подтягивает, и таблицу приходится читать и перекачивать на инициатора целиком.
GROUP BY
Теперь, когда мы это знаем, давайте добавим GROUP BY в запрос. Судя по плану, GROUP BY выполняется в два действия: сначала Aggregating, потом MergingAggregated. Aggregating — до UNION’а, на шардах, а MergingAggregated — после UNION’а, на инициаторе. Это напоминает выполнение агрегатных функций (когда шарды считают стейт, а инициатор делает мёрж). Так и есть, ведь группировка и заключается в вычислении агрегатных функций несколько раз для нескольких групп.
В запросе написан count с группировкой по десяткам. На шардах записано каждое пятое число. Из десяти чисел на каждом отдельном шарде будут только два. Поэтому группировку надо финализировать: слить пять локальных группировок, где на каждые десять чисел count равен двум, в одну «глобальную», где на каждые десять чисел count будет равен десяти.
Объединение локальных группировок в плане называется MergingAggregated. Этот этап выполнения запроса уже не параллелится.
Если ключ группировки совпадает с ключом шардирования, тогда вы наверняка захотите избежать доаггрегации на инициаторе.
Сейчас на экране уже другая группировка. Такую группировку нет смысла финализировать на инициаторе. Ведь те группы, которые сформировались на одном шарде, не пересекаются с группами на других шардах. Локальный результат группировки уже можно рассматривать как окончательный. Чтобы не делать лишней работы с финализацией GROUP BY’я, можно пользоваться сеттингом distributed_group_by_no_merge.
С этим сеттингом план запроса сильно изменится. Стоит его добавить — и после Union’а уже ничего не происходит — только перенаправление результатов локальных группировок пользователю. Инициатор лишь объединяет потоки с шардов. Стало лучше!
Функционал, заложенный в distributed_group_by_no_merge, выходит за рамки одних GROUP BY’ев. Эта настройка имеет кардинальный эффект, который не ограничивается одними группировками. Эта настройка вообще отключает любую финализацию на инициаторе. GROUP BY — только один из случаев. Многим операциям на кластере при неподходящем шардировании нужна финализация.
Отключение финализации часто влияет на количество строк в выводе.
Выше мы считаем uniqExact с отключённым мержем на инициаторе. Каждый шард посчитал свой uniqExact (а не стейт для uniqExact). Шардов пять, поэтому строк в выводе пять. Если бы не настройка — была бы одна.
Это необычно, но предсказуемо. Теперь можно просуммировать пять чисел и получить верный ответ, ведь шардированием мы гарантировали локальность по нумберу. Просуммировав уников, мы избежали перекачки толстых стейтов и избежали финализации, которая в иных случаях отняла бы и времени много, и оперативной памяти.
Если в вашем запросе несколько несвязных подзапросов с распределёнными таблицами, тогда выполнятся они будут друг от друга отдельно (в разных фрагментах). Пользуйтесь тем, что сеттинги можно указывать внутри подзапроса. Тогда они действуют только на том уровне вложенности, на котором объявлены, и на всех вложенных (более глубоких). Если на каком-то уровне вложенности оказывается несколько конфликтующих значений сеттингов, выбирается ближайший объявленный.
На картинке сверху — пример. У нижнего GROUP BY’я есть сеттинг. А у верхнего нет. Верхний GROUP BY мог бы забрать сеттинг у родительского запроса, но и у родительского запроса сеттинга тоже нет, так что для них двоих этот сеттинг не определён. Поэтому GROUP BY в верхнем подзапросе выполняется не так же, как GROUP BY в нижнем. Селект из верхнего подзапроса финализируется на инициаторе, а селект из нижнего — на шардах. Это видно по хостнеймам, которые вывел запрос.
Значение сеттинга можно задать на уровне профиля (SETTINGS PROFILE), из-под которого работает пользователь. Для задания через профиль больше подойдёт настройка optimize_distributed_group_by_sharding_key вместе c optimize_skip_unused_shards (но только если вы уверены в корректном шардировании данных по всему кластеру).
Если у вас несколько уровней вложенности, не забывайте про то, что параллелится всё вплоть до первого подзапроса во FROM-е:
Если, когда мы подходим к GROUP BY, данные есть только на инициаторе (как стало сейчас в нижнем подзапросе), ничего запараллелить уже не получится.
Иначе говоря, настройка distributed_group_by_no_merge сработает только для самого «глубокого» из вложенных подзапросов. А если в самом глубоком нет GROUP BY’я, то настройка ничего не сделает, как если бы её вовсе не указывали.
Есть такие ситуации, когда вы не можете избежать подзапроса во FROM, или когда у вас две группировки друг за другом, и обе вы хотите выполнить локально. Для таких ситуаций есть трюк. Он заключается в том, чтобы на каждом шарде создать вьюшку и положить внутрь вьюшки весь тот код, который вы бы хотели выполнить локально.
Потом, поверх вьюшки — таблицу Distributed. Distributed работает поверх чего угодно, что работает как таблица, даже поверх другой Distributed. Distributed может смотреть и на вьюшку тоже. Обращение к такой Distributed будет подразумевать выполнение кода из вьюшки на каждом шарде.
Сейчас на экране именно это, только я не стал создавать постоянные таблицы, а вместо этого воспользовался табличными функциями. Табличная функция cluster эмулирует таблицу на движке Distributed, view — вьюшку.
Внутри вьюшки, обратите внимание, уже нет смысла обращаться к Distributed-таблице, потому что код внутри вьюшки и так выполнится на каждом сервере. Сеттинги, которые вы укажете на верхнем уровне вложенности, пробросятся во вьюшку без каких-либо проблем. С точки зрения планировщика вывод функции cluster будет уже не чем-то локальным (как подзапрос из таблицы Distributed), а распределённым источником, из-за чего и GROUP BY будет распределенным. А если GROUP BY распределённый, значит, для него можно отключить мерж, что и сделает сеттинг distributed_group_by_no_merge.
JOIN, WITH
Добавим к запросу распределённый JOIN и… сразу столкнёмся с ошибкой. ClickHouse не решается приступить к распределённому JOIN’у (то же самое касается IN’а), пока вы не укажете явным образом, какой стратегией соединения воспользоваться. Таких стратегий три штуки:
‘local’ — это самый предпочтительный вариант. Это когда инициатор, переписывая фрагменты запроса для шардов, делает так, чтобы локальная таблица на каждом сервере соединялась с локальной. Этот вариант можно себе позволить, если таблицы были одинаково шардированы и соединяются по тому же ключу, по которому шардированы.
‘global’ и ‘allow’ предполагают перемещение данных по кластеру. Практически — это shuffle, только ClickHouse не перераспределяет данные между узлами, а обеспечивает каждый узел полным набором данных. Оба варианта подразумевают, что вы будете много данных передавать по сети.
Режимы ‘global’ и ‘local’ можно активировать по-другому (не через сеттинг). Сейчас на экране три варианта того, как синтаксически корректно написать распределённый джойн без сеттинга.
-
Чтобы выполнить ‘local’, не указывая сеттинг, укажите справа от JOIN’а (IN’a) имя локальной таблицы. Тогда каждый шард возьмёт свою локальную таблицу.
-
Чтобы выполнить ‘global’, не указывая сеттинг, вместо IN/JOIN напишите GLOBAL IN/GLOBAL JOIN.
-
Если справа от GLOBAL IN/GLOBAL JOIN указать не распределённую, а обычную таблицу, то вы растиражируете по шардам не селект из распределённого источника, а селект из локальной таблицы на инициаторе. Так, например, можно передавать временные таблицы, созданные запросом CREATE TEMPORARY TABLE, так как они в любом случае создаются только на одной машине — той, где выполняется CREATE-запрос.
Теперь давайте поразбираем запрос, изображённый выше.
Перемещение данных, связанное с GLOBAL IN/JOIN, не прерывает параллельного выполнения запроса.
«Глобальные» (собранные со всего кластера) данные по итогу перемещаются на шарды. Шарды дожидаются этих данных и далее работают параллельно. Если в запросе есть сеттинги, которые направлены на то, чтобы продлить параллельную часть, они продолжат работать, несмотря на GLOBAL. В запросе, изображённом ваше, оба сеттинга сработают. Просто для выполнения потребуется первым делом собрать на инициаторе подзапрос из правой части IN-а и доставить его на каждый шард.
То, что пропушивается на шарды во время выполнения GLOBAL, обёрнуто во временную таблицу (такую же, какой вы сами можете воспользоваться, когда отправляете запросы к Клику через HTTP/TCP интерфейсы).
Это не какая-то специальная структура данных — не дедуплицированное множество для IN’а и не хэш-таблица для JOIN’а. Если вы создадите таблицу на движке Set (который вроде бы специально предназначен для IN-а), и попробуете передать её через режим GLOBAL, то этого не получится сделать — данные из движка Set не оборачиваются во временную таблицу. Поэтому если справа от GLOBAL у вас много дублей, от них порой будет лучше явным образом избавиться (например, через DISTINCT), иначе их все придётся передавать по сети.
У того, что данные перемещаются во временных таблицах, есть и другие последствия. Пусть, например, вы хотите воспользоваться для джойна алгоритмом full_sorting_merge, и рассчитываете, что стадию сортировки получится пропустить благодаря тому, что физическая сортировка таблиц совпадает.
Это произойдёт только в том случае, если джойн локальный. Клик сможет опустить сортировку в том случае, если у него будет возможность поступательно считывать отсортированные данные с диска. Но если вы правую часть джойна получаете не с диска, а через временную таблицу, которую GLOBAL пушит на шарды, этого не произойдёт.
Настройки и механизмы, которые отталкиваются от физической сортировки строк в таблице, не будут действовать для таблиц, которые получены по сети от удалённого узла, а не прочитаны с диска. Хочу показать план выполнения запроса, который сейчас на экране.
Тут предлагаю обратить внимание на то, что:
-
запрос выполняется параллельно — Union оказался в самом верху;
-
условие WHERE number GLOBAL IN применилось для поиска по индексам, несмотря на GLOBAL;
-
условие к правой части джойна применилось и к левой, и наоборот. Сработала специфичная оптимизация для INNER JOIN.
Мы не теряем никакие оптимизации просто от того, что пользуемся Distributed-таблицами и необычным GLOBAL IN-ом. Все приёмы оптимизации, которые работают на одном инстансе ClickHouse, продолжают работать на кластере. Просто на кластере вы к таблицам обращаетесь не напрямую, а через своеобразное прокси в виде Distributed.
Тем не менее, если бы мы выполняли тот же JOIN не в локальной, а в глобальной стратегии, правая часть джойна была бы представлена в виде временной таблицы, и тогда всё было бы иначе.
Временные таблицы не имеют индексов. Воспользоваться индексами таблицы same_sharding получится только в том случае, если написать с правой стороны JOIN’а подзапрос, и туда, внутрь подзапроса, переместить условие, которое позволит воспользоваться индексом таблицы. Но забрать условие WHERE, обращённое к индексам левой таблицы, для чтения по индексам правой, как это получилось в локальном INNER JOIN’е, в глобальном — никак не выйдет.
Ещё одна разновидность перемещения, о которой стоит упомянуть — перемещение на шарды скалярных значений, необходимых шардам для выполнения запроса. Например, как в запросе сверху написан скалярный CTE.
Перемещение скаляра происходит не во временной таблице. Скаляр напрямую вписывается во фрагменты запроса для шардов. На изображении сверху — то, как выглядит распараллеленный фрагмент запроса. Видно, что в него значение скаляра вписано прямо литералом (на последней строчке). Такое перемещение ничего не стоит в плане передачи данных по сети, но, тем не менее, выполнение запроса блокируется до тех пор, пока не будет получен скаляр. Даже EXPLAIN PLAN подвиснет, потому что невозможно составить план запроса, пока не сформулирован сам запрос.
Такой запрос, если хотите, можно переписать так, чтобы в нём не было блокировки (скорее для упражнения, чем для скорости) — для этого можно скалярную CTE заменить на обычную, которая просто раскроется внутри запроса, и заменить знак равенства на IN. Оператор сравнения (равенство) разделит запрос на два фрагмента, один из которых заблокируется в ожидании другого. Зато, если мы напишем IN и выберем локальную стратегию соединения, весь запрос получится выполнить одним фрагментом. А когда каждый шард отдаст строчки с локальными минимумами, можно будет на инициаторе оконной функцией (поверх нескольких десятков строк) оставить только такие, которые содержат глобальный минимум. Заодно будет повод попробовать классный, недавно добавленный QUALIFY!
Заключение
Чтобы писать оптимальные запросы к шардированному кластеру в ClickHouse, прежде всего, надо интересоваться тем, как они выполняются. Эксперементировать с сеттингами. Про Клик можно узнать много нового, если внимательно пролистать список сеттингов с описаниями. Ещё я бы посоветовал следить за релизами, потому что в ClickHouse всё стремительно меняется, и статья со временем будет понемногу терять актуальность.
ссылка на оригинал статьи https://habr.com/ru/articles/896060/
Добавить комментарий