Продолжаем тему потери данных в ClickHouse. Рекомендую прочитать первую статью цикла, в которой раскрывается проблема атомарности INSERT и дедупликация (и, конечно же, решение проблемы потери данных). А тема текущего обсуждения — буферные таблицы и то, как они теряют данные.
P.S. читать каждую ссылку из статьи совсем не обязательно, основные тезисы из ссылок изложены в статье.
1. Введение
Как известно, ClickHouse не любит, когда в него делают 1 000 000 вставок по одной записи — он любит, когда делают одну вставку на 1 000 000 записей. Об этом сказано в официальной документации, а еще об этом рассказывал лично Алексей Миловидов (основатель ClickHouse) на одной из конференций, стенограмму которой можно почитать в статье на Хабре. Краткая выжимка из статьи: все кейсы (кроме Kafka), которые в ней описаны — ненадежные способы решения проблемы батчинга данных перед вставкой. Как правило, все эти методы сводятся к сбору информации от 10 000 операций INSERT в некий буфер, который будет раз в секунду делать INSERT в ClickHouse. Нас же из всего этого интересует именно то, как сам ClickHouse может потерять данные и как этого избежать, поэтому рассматривать будем именно способ с буферными таблицами.
Итак, описываем кейс: у вас есть стримы, генерирующие большое количество вставок данных в ClickHouse в секунду (условно 10 000 операций INSERT в секунду). Вы натравили стрим на обычную MergeTree и получаете следующее (цитата из выступления Алексея Миловидова): «Что будет, если делать плохо? Вставляем по одной строке в таблицу MergeTree и получается 59 строк в секунду. Это в 10 000 раз медленнее. В ReplicatedMergeTree – 6 строк в секунду. А если еще кворум включится, то получается 2 строки в секунду. По-моему, это какой-то кромешный отстой. Как можно так тормозить? У меня даже на футболке написано, что ClickHouse не должен тормозить. Но тем не менее бывает иногда.»
Помимо невероятного торможения, конечно же, получим еще и перегруженность железа: под капотом отрабатывает слияние кусков данных (собственно поэтому движки таблиц и имеют в названии Merge, где есть Merge — там присутствует процесс слияния данных). Проще говоря: каждая операция INSERT в таблицу, содержащую в названии движка *Merge*, вставляет данные не в файл целевой таблицы, а в небольшие кусочки данных, хранящиеся рядом с основным файлом таблицы. И под капотом с определенным интервалом, задающимся настройкой max_merge_selecting_sleep_ms, запускается процесс слияния всех кусков данных в один файл данных (как правило, есть крупный файл таблицы/партиции и мелкие файлики от кучи инсертов, которые мержатся с этим крупным файлом и удаляются). Именно процесс слияния и будет убивать производительность вашего ClickHouse.
Пытаемся решить проблему. Нам стало очевидно, что данные надо как-то собирать в крупные пачки перед вставкой. Находим простое решение из коробки — буферные таблицы (это ведь даже звучит ровно как то, что нужно).
2. Буферные таблицы
Идея буферной таблицы очень простая — она пишет данные для записи в оперативную память, периодически сбрасывая их в целевую таблицу. Условия для сброса данных вы пишите сами, например: либо по истечении 100 секунд с момента предыдущего сброса данных на диск, либо по накоплению миллиона записей, либо по накоплению 1ГБ данных. Очень удобно + из коробки. 10 000 операций INSERT попадут не в файлы на диске (которые потом еще и слить надо с основной таблицей), а в оперативную память, и затем в один-единственный файл на диске вместо кучи файлов, что существенно снизит нагрузку при операции слияния.
Потеря данных произойдет, например, при падении ClickHouse. Все, что было в оперативной памяти, будет утеряно, включая те данные в буферных таблицах, которые еще не успели сброситься на диск.
Как же не потерять данные? Самый правильный, на мой взгляд, способ — использовать Kafka (как бы не хотелось добавлять еще одну распределенную систему) в связке с Kafka Engine. Данный способ хорошо зарекомендовал себя на рынке.
3. Kafka Engine
В рамках статьи, конечно, не получится рассказать обо всем. Поэтому оставлю ссылки на полезные статьи, которые позволят теоретически освоить как Kafka, так и Kafka Engine.
-
ClickHouse® Kafka Engine Tutorial — Altinity | Run open source ClickHouse® better
-
ClickHouse® Kafka Engine FAQ — Altinity | Run open source ClickHouse® better
А мы же сконцентрируемся именно на решении проблемы потери данных. Нам нужно доказательство атомарности технологии Kafka Engine. Для этого подготовил демонстрационный кейс, рассмотрим его.
Конфигурация максимально простая: докер, одна нода Kafka, одна нода Zookeper, одна нода ClickHouse

В Kafka создан тестовый топик с названием test_topic_json, в котором сообщение в формате json с одним полем message и строковым значением (на изображении плохо видно, продублирую текстом)
|
{«message»: «qq»} |
|
{«message»: «bb»} |

В ClickHouse создан классический пайплайн Kafka Engine:
drop table if exists test_kafka_json_parse; create table test_kafka_json_parse ( message String ) engine = Kafka SETTINGS kafka_broker_list = 'kafka:29092' , kafka_topic_list = 'test_topic_json' , kafka_group_name = 'my-group4' , kafka_format = 'JSONEachRow' , kafka_max_block_size = 2097152 ; drop table if exists message_json_parse; create table message_json_parse ( message String ) engine = MergeTree order by message ; drop view if exists test_kafka_json_parse_view; create materialized view test_kafka_json_parse_view TO default.message_json_parse ( `message` String ) AS SELECT message FROM test_kafka_json_parse ;
Что этот пайплайн делает: читает свежие данные из Kafka из топика test_topic_json . Коннект осуществляется благодаря таблице test_kafka_json_parse, материализованное представление test_kafka_json_parse_view является консьюмером, message_json_parse — конечная таблица, в которую мат вьюха (консьюмер) пишет данные.
Реализовав все выше изложенное, получаем следующий результат: в конечной таблице видим оба сообщения, присутствующие в Kafka

Также видим следующе значение offset для консьюмера my-group4 (продублирую текстом — всего сообщений 2, offset тоже равен 2):

Получается, что консьюмер my-group4 вычитал полностью топик test_topic_json, что абсолютно верно.
Теперь необходимо доказать атомарность. Для этого пересоздадим конечную таблицу message_json_parse таким образом:
drop table if exists message_json_parse; create table message_json_parse ( message UInt64 ) engine = MergeTree order by message ;
message UInt64 — ключевой момент (UInt64 — строго положительное число). Мы специально ломаем конечную таблицу, чтобы мат вьюха не могла записать в нее данные. Затем закидываем новое сообщение в топик test_topic_json, пускай будет
|
{«message»: «gg»} |
При всем при этом мы НЕ трогаем ни Kafka-таблицу, ни мат вьюху. И вот что получаем в таком случае:

Видим, что теперь для консьюмера my-group4 Offset не равен End. Сообщений в топике 3, но смещение у консьюмера равно 2. Очевидно, что мат вьюха не может «gg» записать в колонку с типом UInt64. Посмотрим на лог ошибок клика, который даст ответы на все вопросы:

Продублирую текстом самое важное и очевидное: Cannot parse string 'gg' as UInt64. А о чем же суммарно с несмещенным offset нам это говорит? — это говорит о том, что offset коммитится только при успешной отработке материализованного представления, что и гарантирует атомарность.
Из кейса выше очевидно, что ClickHouse прочитал сообщение «gg» из топика. Это было важно доказать в контексте потери данных. Мы понимаем, что операция чтения из Kafka была осуществлена, данные физически залетели в ClickHouse (в RAM). Затем происходят бесконечные попытки записать сообщение в таблицу. К новым сообщениям консьюмер, конечно же, не перейдет (хотя и есть настройки, конфигурирующие это — на практике не встречал и не советую).
Далее можно делать что угодно: положить ClickHouse, рестартнуть докер/машину, да хоть удалить ClickHouse и проделать все манипуляции заново, главное — не трогать Kafka, так как она бережно хранит в себе offset для нашего консьюмера.
Ну а мы же поправим ошибку, чтобы убедиться, что после этого сообщение будет успешно прочитано:
drop table if exists message_json_parse; create table message_json_parse ( message String ) engine = MergeTree order by message ;
И закономерный результат:

Также видим закономерный результат и в Kafka (End=3, Offset=3):

Получается, что после решения проблемы чтение продолжилось, очевидно, с последнего offset, и корректная работа была восстановлена БЕЗ потерь данных. Тоже самое будет и в любой другой ситуации: падение сервера, ошибки коллег, отклонение от контракта данных в Kafka и т.д.
P.S. Если есть желание, то можно посмотреть исходный код консьюмера.
4. Заключение
Далеко не факт, что вы столкнетесь с ситуацией «10000 INSERT Per Second». Но всегда помните следующее:
-
Автор большой фанат ClickHouse и хочет, чтобы ClickHouse не только не тормозил, но и не терял данные.
-
ClickHouse не любит, когда в него делают 1 000 000 вставок по одной записи — он любит, когда делают одну вставку на 1 000 000 записей.
-
Все способы буферизации (включая асинхронные вставки) чреваты потерей данных.
-
Kafka Engine — проверенная временем технология, не теряющая данные.
-
Если есть возможность формировать крупные пачки данных надежным способом, отличным от Kafka Engine — то это тоже решение проблемы.
Пользуйтесь на здоровье!
ссылка на оригинал статьи https://habr.com/ru/articles/934320/
Добавить комментарий