ClickHouse не тормозит, но теряет данные. Часть 2 — от буферных таблиц к Kafka Engine

от автора

Продолжаем тему потери данных в ClickHouse. Рекомендую прочитать первую статью цикла, в которой раскрывается проблема атомарности INSERT и дедупликация (и, конечно же, решение проблемы потери данных). А тема текущего обсуждения — буферные таблицы и то, как они теряют данные.

P.S. читать каждую ссылку из статьи совсем не обязательно, основные тезисы из ссылок изложены в статье.

1. Введение

Как известно, ClickHouse не любит, когда в него делают 1 000 000 вставок по одной записи — он любит, когда делают одну вставку на 1 000 000 записей. Об этом сказано в официальной документации, а еще об этом рассказывал лично Алексей Миловидов (основатель ClickHouse) на одной из конференций, стенограмму которой можно почитать в статье на Хабре. Краткая выжимка из статьи: все кейсы (кроме Kafka), которые в ней описаны — ненадежные способы решения проблемы батчинга данных перед вставкой. Как правило, все эти методы сводятся к сбору информации от 10 000 операций INSERT в некий буфер, который будет раз в секунду делать INSERT в ClickHouse. Нас же из всего этого интересует именно то, как сам ClickHouse может потерять данные и как этого избежать, поэтому рассматривать будем именно способ с буферными таблицами.

Итак, описываем кейс: у вас есть стримы, генерирующие большое количество вставок данных в ClickHouse в секунду (условно 10 000 операций INSERT в секунду). Вы натравили стрим на обычную MergeTree и получаете следующее (цитата из выступления Алексея Миловидова): «Что будет, если делать плохо? Вставляем по одной строке в таблицу MergeTree и получается 59 строк в секунду. Это в 10 000 раз медленнее. В ReplicatedMergeTree – 6 строк в секунду. А если еще кворум включится, то получается 2 строки в секунду. По-моему, это какой-то кромешный отстой. Как можно так тормозить? У меня даже на футболке написано, что ClickHouse не должен тормозить. Но тем не менее бывает иногда.»

Помимо невероятного торможения, конечно же, получим еще и перегруженность железа: под капотом отрабатывает слияние кусков данных (собственно поэтому движки таблиц и имеют в названии Merge, где есть Merge — там присутствует процесс слияния данных). Проще говоря: каждая операция INSERT в таблицу, содержащую в названии движка *Merge*, вставляет данные не в файл целевой таблицы, а в небольшие кусочки данных, хранящиеся рядом с основным файлом таблицы. И под капотом с определенным интервалом, задающимся настройкой max_merge_selecting_sleep_ms, запускается процесс слияния всех кусков данных в один файл данных (как правило, есть крупный файл таблицы/партиции и мелкие файлики от кучи инсертов, которые мержатся с этим крупным файлом и удаляются). Именно процесс слияния и будет убивать производительность вашего ClickHouse.

Пытаемся решить проблему. Нам стало очевидно, что данные надо как-то собирать в крупные пачки перед вставкой. Находим простое решение из коробки — буферные таблицы (это ведь даже звучит ровно как то, что нужно).

2. Буферные таблицы

Идея буферной таблицы очень простая — она пишет данные для записи в оперативную память, периодически сбрасывая их в целевую таблицу. Условия для сброса данных вы пишите сами, например: либо по истечении 100 секунд с момента предыдущего сброса данных на диск, либо по накоплению миллиона записей, либо по накоплению 1ГБ данных. Очень удобно + из коробки. 10 000 операций INSERT попадут не в файлы на диске (которые потом еще и слить надо с основной таблицей), а в оперативную память, и затем в один-единственный файл на диске вместо кучи файлов, что существенно снизит нагрузку при операции слияния.

Потеря данных произойдет, например, при падении ClickHouse. Все, что было в оперативной памяти, будет утеряно, включая те данные в буферных таблицах, которые еще не успели сброситься на диск.

Как же не потерять данные? Самый правильный, на мой взгляд, способ — использовать Kafka (как бы не хотелось добавлять еще одну распределенную систему) в связке с Kafka Engine. Данный способ хорошо зарекомендовал себя на рынке.

3. Kafka Engine

В рамках статьи, конечно, не получится рассказать обо всем. Поэтому оставлю ссылки на полезные статьи, которые позволят теоретически освоить как Kafka, так и Kafka Engine.

визуализация пайплайна данных

визуализация пайплайна данных

А мы же сконцентрируемся именно на решении проблемы потери данных. Нам нужно доказательство атомарности технологии Kafka Engine. Для этого подготовил демонстрационный кейс, рассмотрим его.

Конфигурация максимально простая: докер, одна нода Kafka, одна нода Zookeper, одна нода ClickHouse

В Kafka создан тестовый топик с названием test_topic_json, в котором сообщение в формате json с одним полем message и строковым значением (на изображении плохо видно, продублирую текстом)

{«message»: «qq»}

{«message»: «bb»}

В ClickHouse создан классический пайплайн Kafka Engine:

drop table if exists test_kafka_json_parse; create table test_kafka_json_parse (     message String )     engine = Kafka     SETTINGS kafka_broker_list = 'kafka:29092'         , kafka_topic_list = 'test_topic_json'         , kafka_group_name = 'my-group4'         , kafka_format = 'JSONEachRow'         , kafka_max_block_size = 2097152 ;  drop table if exists message_json_parse; create table message_json_parse (     message String ) engine = MergeTree order by message ;   drop view if exists test_kafka_json_parse_view; create materialized view test_kafka_json_parse_view TO default.message_json_parse             (              `message`  String                 ) AS SELECT message FROM test_kafka_json_parse ;

Что этот пайплайн делает: читает свежие данные из Kafka из топика test_topic_json . Коннект осуществляется благодаря таблице test_kafka_json_parse, материализованное представление test_kafka_json_parse_view является консьюмером, message_json_parse — конечная таблица, в которую мат вьюха (консьюмер) пишет данные.

Реализовав все выше изложенное, получаем следующий результат: в конечной таблице видим оба сообщения, присутствующие в Kafka

Также видим следующе значение offset для консьюмера my-group4 (продублирую текстом — всего сообщений 2, offset тоже равен 2):

Получается, что консьюмер my-group4 вычитал полностью топик test_topic_json, что абсолютно верно.

Теперь необходимо доказать атомарность. Для этого пересоздадим конечную таблицу message_json_parse таким образом:

drop table if exists message_json_parse; create table message_json_parse (     message UInt64 ) engine = MergeTree order by message ;

message UInt64 — ключевой момент (UInt64 — строго положительное число). Мы специально ломаем конечную таблицу, чтобы мат вьюха не могла записать в нее данные. Затем закидываем новое сообщение в топик test_topic_json, пускай будет

{«message»: «gg»}

При всем при этом мы НЕ трогаем ни Kafka-таблицу, ни мат вьюху. И вот что получаем в таком случае:

Видим, что теперь для консьюмера my-group4 Offset не равен End. Сообщений в топике 3, но смещение у консьюмера равно 2. Очевидно, что мат вьюха не может «gg» записать в колонку с типом UInt64. Посмотрим на лог ошибок клика, который даст ответы на все вопросы:

Продублирую текстом самое важное и очевидное: Cannot parse string 'gg' as UInt64. А о чем же суммарно с несмещенным offset нам это говорит? — это говорит о том, что offset коммитится только при успешной отработке материализованного представления, что и гарантирует атомарность.

Из кейса выше очевидно, что ClickHouse прочитал сообщение «gg» из топика. Это было важно доказать в контексте потери данных. Мы понимаем, что операция чтения из Kafka была осуществлена, данные физически залетели в ClickHouse (в RAM). Затем происходят бесконечные попытки записать сообщение в таблицу. К новым сообщениям консьюмер, конечно же, не перейдет (хотя и есть настройки, конфигурирующие это — на практике не встречал и не советую).

Далее можно делать что угодно: положить ClickHouse, рестартнуть докер/машину, да хоть удалить ClickHouse и проделать все манипуляции заново, главное — не трогать Kafka, так как она бережно хранит в себе offset для нашего консьюмера.

Ну а мы же поправим ошибку, чтобы убедиться, что после этого сообщение будет успешно прочитано:

drop table if exists message_json_parse; create table message_json_parse (     message String ) engine = MergeTree order by message ;

И закономерный результат:

Также видим закономерный результат и в Kafka (End=3, Offset=3):

Получается, что после решения проблемы чтение продолжилось, очевидно, с последнего offset, и корректная работа была восстановлена БЕЗ потерь данных. Тоже самое будет и в любой другой ситуации: падение сервера, ошибки коллег, отклонение от контракта данных в Kafka и т.д.

P.S. Если есть желание, то можно посмотреть исходный код консьюмера.

4. Заключение

Далеко не факт, что вы столкнетесь с ситуацией «10000 INSERT Per Second». Но всегда помните следующее:

  • Автор большой фанат ClickHouse и хочет, чтобы ClickHouse не только не тормозил, но и не терял данные.

  • ClickHouse не любит, когда в него делают 1 000 000 вставок по одной записи — он любит, когда делают одну вставку на 1 000 000 записей.

  • Все способы буферизации (включая асинхронные вставки) чреваты потерей данных.

  • Kafka Engine — проверенная временем технология, не теряющая данные.

  • Если есть возможность формировать крупные пачки данных надежным способом, отличным от Kafka Engine — то это тоже решение проблемы.

Пользуйтесь на здоровье!


ссылка на оригинал статьи https://habr.com/ru/articles/934320/