COPY в PostgreSQL: грузим данные быстро, безопасно и без сюрпризов

от автора

Привет, Хабр.

Сегодня разбираем COPY в PostgreSQL. Это рабочая лошадка для массовой загрузки и выгрузки данных.

Что делает COPY и чем он отличается от INSERT

COPY переносит данные между таблицей и файлом или потоками STDIN/STDOUT. Вариант COPY FROM загружает, COPY TO выгружает. Умеет в форматы text, csv, binary.

Поддерживает параметры ON_ERROR, FREEZE, HEADER и HEADER MATCH, FORCE_*, ENCODING, WHERE, а также запуск внешних программ через PROGRAM. Это раза в два быстрее любого батчевого INSERT при равных условиях и заметно проще в эксплуатации.

Минимальные конструкции, которыми пользуемся:

-- выгрузка таблицы в CSV в поток COPY public.events TO STDOUT WITH (FORMAT csv, HEADER true);  -- загрузка из CSV из потока COPY public.events FROM STDIN WITH (FORMAT csv, HEADER true);  -- загрузка из файла на сервере (нужны права) COPY public.events FROM '/var/lib/postgresql/import.csv' WITH (FORMAT csv, HEADER true);  -- через внешнюю программу (распаковка на лету) COPY public.events FROM PROGRAM 'gzip -dc /data/import.csv.gz' WITH (FORMAT csv, HEADER true);  -- выгрузка результата запроса, не всей таблицы COPY (   SELECT id, payload, created_at   FROM public.events   WHERE created_at >= DATE '2025-01-01' ) TO STDOUT WITH (FORMAT csv, HEADER true);

COPY против \copy в psql

Важно разделять два мира. COPY ... FROM/TO 'filename' и COPY ... PROGRAM работают на стороне сервера и требуют специальных ролей. Файлы должны быть доступны именно серверу. Метакоманда \copy из psql это обёртка вокруг COPY ... FROM STDIN/TO STDOUT, и файлы читаются/пишутся на стороне клиента. Если нет серверных прав или файл лежит у вас локально, то используем уже \copy. Если нужно максимальное быстродействие и файлы уже на сервере,то тут уже нужен серверный COPY.

Пример для psql:

-- клиентская выгрузка в сжатый файл \copy (SELECT * FROM public.events) TO PROGRAM 'gzip > /tmp/events.csv.gz' WITH (FORMAT csv, HEADER true)  -- клиентская загрузка \copy public.events FROM '/home/user/import.csv' WITH (FORMAT csv, HEADER true)

Обработка ошибок при загрузке

По дефолту COPY FROM завершится на первой проблемной строке. В ситуациях «загружаем всё, что валидно, остальное в карантин» помогает ON_ERROR ignore:

COPY public.events FROM STDIN   WITH (     FORMAT csv,     HEADER true,     ON_ERROR ignore,     LOG_VERBOSITY verbose   );

При ignore все некорректные строки будут пропущены, в конце придёт NOTICE с количеством пропусков. С LOG_VERBOSITY verbose сервер дополнительно пишет, какая строка и какая колонка упали на конверсии. Это удобно, но не превращаем это в норму на постоянной основе: для чистых пайплайнов лучше staging-таблицы с текстовыми колонками, грузить туда, а затем явной валидацией и приведение типов вносить в целевую структуру.

Пример staging-потока:

-- сырой слой CREATE UNLOGGED TABLE staging.events_raw (   id_text text,   payload_text text,   created_at_text text );  -- без ограничений, чтобы грузить максимально быстро COPY staging.events_raw FROM STDIN WITH (FORMAT csv, HEADER true);  -- чистовой слой INSERT INTO public.events (id, payload, created_at) SELECT   id_text::bigint,   payload_text::jsonb,   created_at_text::timestamptz FROM staging.events_raw WHERE id_text ~ '^\d+$'   AND created_at_text IS NOT NULL ON CONFLICT (id) DO UPDATE SET payload = EXCLUDED.payload,     created_at = EXCLUDED.created_at;

Где смотреть прогресс

Во время работы COPY сервер публикует состояние в pg_stat_progress_copy.

SELECT pid, datname, relid::regclass AS relation, command, type, bytes_processed, bytes_total,        tuples_processed, tuples_excluded, error_count FROM pg_stat_progress_copy;

tuples_excluded растёт, если используется WHERE и строки отбрасываются. error_count полезен при ON_ERROR ignore.

Форматы: когда какой

Три режима покрывают почти все случаи.

Текстовый формат даёт табуляцию как разделитель и \N как NULL. Он есть, но редко нужен: CSV удобнее и предсказуемее, а бинарный быстрее.

CSV — рабочий стандарт для интеграций. Пара нюансов:

  • HEADER true добавляет заголовок на выгрузке и пропускает первую строку на загрузке. Вход можно усилить HEADER MATCH, тогда заголовок обязан совпасть с реальными именами колонок и порядком.

  • Пустая строка и NULL различаются. Пустая строка это "", NULL это пусто без кавычек, либо строка из параметра NULL '...'.

  • Для явного управления кавычками и экранированием используем QUOTE, ESCAPE, а для принудительного заключения некоторых колонок FORCE_QUOTE (col1, col2).

Бинарный формат выгоден для потока Postgres → Postgres одной версии, например при переносе больших объёмов между кластерами в одной инфраструктуре. Версии должны совпадать, типы колонок совместимы, данные не перемещаем между архитектурами, где отличается порядок байтов.

Примеры CSV-выгрузки и загрузки с тонкостями:

-- выгружаем с принудительными кавычками для текстовых колонок COPY public.users (id, email, country) TO STDOUT WITH (FORMAT csv, HEADER true, FORCE_QUOTE (email, country));  -- грузим, требуя строгого совпадения заголовка COPY public.users (id, email, country) FROM STDIN WITH (FORMAT csv, HEADER match);

FREEZE: когда да, когда нет

COPY FROM ... WITH (FREEZE true) замораживает строки сразу, как после VACUUM FREEZE. Это ускоряет первичную загрузку новой или только что очищенной таблицы и уменьшает давление на автovacuum. Условия там строгие: таблица должна быть создана или опустошена в текущей транзакции, без открытых курсоров и конкурирующих снимков. На секционированных таблицах FREEZE сейчас не применяется.

Сценарий:

BEGIN; CREATE TABLE public.import_users (LIKE public.users INCLUDING ALL); COPY public.import_users FROM PROGRAM 'gzip -dc /data/users.csv.gz'   WITH (FORMAT csv, HEADER true, FREEZE true); ANALYZE public.import_users; COMMIT;

WHERE прямо в COPY

На загрузке можно отбрасывать строки по условию.

COPY public.events (id, payload, created_at) FROM STDIN   WITH (FORMAT csv, HEADER true)   WHERE id IS NOT NULL AND created_at >= DATE '2025-01-01';

Количество исключённых строк будет видно в pg_stat_progress_copy.

Безопасность

Если используете COPY ... FROM/TO 'filename' или PROGRAM, нужны привилегии суперпользователя или членство в ролях pg_read_server_files, pg_write_server_files, pg_execute_server_program.

Это осознанное ограничение: сервер получает доступ к файловой системе и shell. В PROGRAM команда запускается через оболочку, поэтому недоверенный ввод встраивать нельзя. Для таких сценариев держите фиксированные строки, белые списки и внимательное экранирование.

Ещё два момента. Для COPY TO путь обязан быть абсолютным. Для COPY FROM это рекомендация, но лучше не полагаться на рабочую директорию кластера. И не забываем про права на таблицы: для COPY TO нужно право SELECT на колонки, для COPY FROM право INSERT.

Кодировки и форматы дат

Если файл не в кодировке клиента, задаем ENCODING '...' в COPY. Для переносимой выгрузки я перед выгрузкой ставим ISO-формат дат:

SET DateStyle TO ISO, YMD; COPY (SELECT * FROM public.events WHERE created_at >= DATE '2025-01-01') TO STDOUT WITH (FORMAT csv, HEADER true, ENCODING 'UTF8');

На CSV все символы значимы, включая пробелы. Если источник дополняет строки пробелами до фиксированной ширины, чистим файл до загрузки. В CSV значение может включать переносы строк, если оно в кавычках.

COPY из кода: Python, Go, Rust

Python, psycopg3

Нормальная потоковая работа делается через cursor.copy(...). COPY не поддерживает параметры, поэтому SQL-строку формируем сами из белого списка идентификаторов.

import psycopg from psycopg.rows import dict_row  def copy_csv_to_table(dsn: str, table: str, csv_path: str):     if table not in {"public_events", "public_users"}:         raise ValueError("table not allowed")     copy_sql = f"COPY {table} FROM STDIN WITH (FORMAT csv, HEADER true)"     with psycopg.connect(dsn, autocommit=False) as conn:         with conn.cursor(row_factory=dict_row) as cur, open(csv_path, "rb") as f:             with cur.copy(copy_sql) as cp:                 while chunk := f.read(256 * 1024):                     cp.write(chunk)         conn.commit()  def copy_table_to_csv(dsn: str, table: str, out_path: str):     if table not in {"public_events", "public_users"}:         raise ValueError("table not allowed")     copy_sql = f"COPY {table} TO STDOUT WITH (FORMAT csv, HEADER true)"     with psycopg.connect(dsn, autocommit=True) as conn:         with conn.cursor() as cur, open(out_path, "wb") as f:             with cur.copy(copy_sql) as cp:                 for data in cp:                     f.write(data)  def copy_between(dsn_src: str, dsn_dst: str, src_query: str, dst_table: str):     # перенос Postgres → Postgres в бинарном формате при совпадающих версиях     sql_out = f"COPY ({src_query}) TO STDOUT WITH (FORMAT binary)"     sql_in  = f"COPY {dst_table} FROM STDIN WITH (FORMAT binary)"     with psycopg.connect(dsn_src) as s, psycopg.connect(dsn_dst) as d:         with s.cursor() as cs, d.cursor() as cd:             with cs.copy(sql_out) as cp_out, cd.copy(sql_in) as cp_in:                 for data in cp_out:                     cp_in.write(data)         d.commit()

Замечания по безопасности: никаких пользовательских фрагментов в строке COPY, только whitelisting таблиц и колонок. Пакуем данные крупными блоками, на маленьких кусках будет лишняя системная возня.

Go, pgx

В pgx есть CopyFrom, который забирает источник строк и шлёт их по протоколу COPY. Это удобнее, чем самому форматировать CSV, и быстрее, чем INSERT.

package bulk  import (     "context"     "encoding/csv"     "io"     "strconv"      "github.com/jackc/pgx/v5" )  func CopyRows(ctx context.Context, conn *pgx.Conn, table string, columns []string, rows [][]any) (int64, error) {     // columns вроде []string{"id", "payload", "created_at"}     return conn.CopyFrom(ctx, pgx.Identifier{table}, columns, pgx.CopyFromRows(rows)) }  func CopyCSV(ctx context.Context, conn *pgx.Conn, table string, columns []string, r io.Reader) (int64, error) {     dec := csv.NewReader(r)     // пропускаем заголовок     if _, err := dec.Read(); err != nil {         return 0, err     }     const batch = 5000     buf := make([][]any, 0, batch)     var total int64      flush := func() error {         if len(buf) == 0 {             return nil         }         n, err := conn.CopyFrom(ctx, pgx.Identifier{table}, columns, pgx.CopyFromRows(buf))         total += n         buf = buf[:0]         return err     }      for {         rec, err := dec.Read()         if err == io.EOF {             break         }         if err != nil {             return total, err         }         id, _ := strconv.ParseInt(rec[0], 10, 64)         row := []any{id, rec[1], rec[2]} // пример         buf = append(buf, row)         if len(buf) >= batch {             if err := flush(); err != nil {                 return total, err             }         }     }     if err := flush(); err != nil {         return total, err     }     return total, nil }

Здесь не генерируем CSV сами. pgx упакует всё в протокол COPY.

Rust, tokio-postgres

У клиента есть copy_out и copy_in. Работает потоками, без промежуточных файлов.

use tokio_postgres::{NoTls, Error}; use futures_util::{StreamExt, SinkExt}; use bytes::Bytes;  pub async fn copy_out_csv(conn_str: &str, table: &str) -> Result<Vec<u8>, Error> {     let (client, connection) = tokio_postgres::connect(conn_str, NoTls).await?;     tokio::spawn(async move { let _ = connection.await; });      let sql = format!("COPY {} TO STDOUT WITH (FORMAT csv, HEADER true)", table);     let mut stream = client.copy_out(&*sql).await?;     let mut out = Vec::new();      while let Some(chunk) = stream.next().await {         out.extend_from_slice(&chunk?);     }     Ok(out) }  pub async fn copy_in_csv(conn_str: &str, table: &str, data: &[u8]) -> Result<(), Error> {     let (client, connection) = tokio_postgres::connect(conn_str, NoTls).await?;     tokio::spawn(async move { let _ = connection.await; });      let sql = format!("COPY {} FROM STDIN WITH (FORMAT csv, HEADER true)", table);     let mut sink = client.copy_in(&*sql).await?;     sink.send(Bytes::from(data.to_vec())).await?;     sink.close().await?;     Ok(()) }

COPY закрывает большинство задач массовых загрузок и выгрузок в Postgres. Если использовать его с оглядкой на права, форматы, кодировки и план обслуживания таблицы, получаем стабильные и быстрые пайплайны. В спорных местах проще выдержать staging, в горячих работать потоками без диска, в инфраструктурных переносах просто использовать бинарный формат при совпадающих версиях.

Приглашаем вас пройти вступительное тестирование по теме «Базы данных». Тестирование поможет объективно оценить текущий уровень знаний и навыков, необходимых для эффективного усвоения материала курса.

Курс «Базы данных» подробно рассматривает современные подходы к работе с системами управления базами данных, включая эффективные методы загрузки и выгрузки данных, управление таблицами и транзакциями, а также практическое применение SQL. Приглашаем вас присоединиться к обучению, чтобы получить системные знания и навыки в этой области.


ссылка на оригинал статьи https://habr.com/ru/articles/935454/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *