Привет, Хабр.
Сегодня разбираем COPY в PostgreSQL. Это рабочая лошадка для массовой загрузки и выгрузки данных.
Что делает COPY и чем он отличается от INSERT
COPY переносит данные между таблицей и файлом или потоками STDIN/STDOUT. Вариант COPY FROM загружает, COPY TO выгружает. Умеет в форматы text, csv, binary.
Поддерживает параметры ON_ERROR, FREEZE, HEADER и HEADER MATCH, FORCE_*, ENCODING, WHERE, а также запуск внешних программ через PROGRAM. Это раза в два быстрее любого батчевого INSERT при равных условиях и заметно проще в эксплуатации.
Минимальные конструкции, которыми пользуемся:
-- выгрузка таблицы в CSV в поток COPY public.events TO STDOUT WITH (FORMAT csv, HEADER true); -- загрузка из CSV из потока COPY public.events FROM STDIN WITH (FORMAT csv, HEADER true); -- загрузка из файла на сервере (нужны права) COPY public.events FROM '/var/lib/postgresql/import.csv' WITH (FORMAT csv, HEADER true); -- через внешнюю программу (распаковка на лету) COPY public.events FROM PROGRAM 'gzip -dc /data/import.csv.gz' WITH (FORMAT csv, HEADER true); -- выгрузка результата запроса, не всей таблицы COPY ( SELECT id, payload, created_at FROM public.events WHERE created_at >= DATE '2025-01-01' ) TO STDOUT WITH (FORMAT csv, HEADER true);
COPY против \copy в psql
Важно разделять два мира. COPY ... FROM/TO 'filename' и COPY ... PROGRAM работают на стороне сервера и требуют специальных ролей. Файлы должны быть доступны именно серверу. Метакоманда \copy из psql это обёртка вокруг COPY ... FROM STDIN/TO STDOUT, и файлы читаются/пишутся на стороне клиента. Если нет серверных прав или файл лежит у вас локально, то используем уже \copy. Если нужно максимальное быстродействие и файлы уже на сервере,то тут уже нужен серверный COPY.
Пример для psql:
-- клиентская выгрузка в сжатый файл \copy (SELECT * FROM public.events) TO PROGRAM 'gzip > /tmp/events.csv.gz' WITH (FORMAT csv, HEADER true) -- клиентская загрузка \copy public.events FROM '/home/user/import.csv' WITH (FORMAT csv, HEADER true)
Обработка ошибок при загрузке
По дефолту COPY FROM завершится на первой проблемной строке. В ситуациях «загружаем всё, что валидно, остальное в карантин» помогает ON_ERROR ignore:
COPY public.events FROM STDIN WITH ( FORMAT csv, HEADER true, ON_ERROR ignore, LOG_VERBOSITY verbose );
При ignore все некорректные строки будут пропущены, в конце придёт NOTICE с количеством пропусков. С LOG_VERBOSITY verbose сервер дополнительно пишет, какая строка и какая колонка упали на конверсии. Это удобно, но не превращаем это в норму на постоянной основе: для чистых пайплайнов лучше staging-таблицы с текстовыми колонками, грузить туда, а затем явной валидацией и приведение типов вносить в целевую структуру.
Пример staging-потока:
-- сырой слой CREATE UNLOGGED TABLE staging.events_raw ( id_text text, payload_text text, created_at_text text ); -- без ограничений, чтобы грузить максимально быстро COPY staging.events_raw FROM STDIN WITH (FORMAT csv, HEADER true); -- чистовой слой INSERT INTO public.events (id, payload, created_at) SELECT id_text::bigint, payload_text::jsonb, created_at_text::timestamptz FROM staging.events_raw WHERE id_text ~ '^\d+$' AND created_at_text IS NOT NULL ON CONFLICT (id) DO UPDATE SET payload = EXCLUDED.payload, created_at = EXCLUDED.created_at;
Где смотреть прогресс
Во время работы COPY сервер публикует состояние в pg_stat_progress_copy.
SELECT pid, datname, relid::regclass AS relation, command, type, bytes_processed, bytes_total, tuples_processed, tuples_excluded, error_count FROM pg_stat_progress_copy;
tuples_excluded растёт, если используется WHERE и строки отбрасываются. error_count полезен при ON_ERROR ignore.
Форматы: когда какой
Три режима покрывают почти все случаи.
Текстовый формат даёт табуляцию как разделитель и \N как NULL. Он есть, но редко нужен: CSV удобнее и предсказуемее, а бинарный быстрее.
CSV — рабочий стандарт для интеграций. Пара нюансов:
-
HEADER trueдобавляет заголовок на выгрузке и пропускает первую строку на загрузке. Вход можно усилитьHEADER MATCH, тогда заголовок обязан совпасть с реальными именами колонок и порядком. -
Пустая строка и NULL различаются. Пустая строка это
"", NULL это пусто без кавычек, либо строка из параметраNULL '...'. -
Для явного управления кавычками и экранированием используем
QUOTE,ESCAPE, а для принудительного заключения некоторых колонокFORCE_QUOTE (col1, col2).
Бинарный формат выгоден для потока Postgres → Postgres одной версии, например при переносе больших объёмов между кластерами в одной инфраструктуре. Версии должны совпадать, типы колонок совместимы, данные не перемещаем между архитектурами, где отличается порядок байтов.
Примеры CSV-выгрузки и загрузки с тонкостями:
-- выгружаем с принудительными кавычками для текстовых колонок COPY public.users (id, email, country) TO STDOUT WITH (FORMAT csv, HEADER true, FORCE_QUOTE (email, country)); -- грузим, требуя строгого совпадения заголовка COPY public.users (id, email, country) FROM STDIN WITH (FORMAT csv, HEADER match);
FREEZE: когда да, когда нет
COPY FROM ... WITH (FREEZE true) замораживает строки сразу, как после VACUUM FREEZE. Это ускоряет первичную загрузку новой или только что очищенной таблицы и уменьшает давление на автovacuum. Условия там строгие: таблица должна быть создана или опустошена в текущей транзакции, без открытых курсоров и конкурирующих снимков. На секционированных таблицах FREEZE сейчас не применяется.
Сценарий:
BEGIN; CREATE TABLE public.import_users (LIKE public.users INCLUDING ALL); COPY public.import_users FROM PROGRAM 'gzip -dc /data/users.csv.gz' WITH (FORMAT csv, HEADER true, FREEZE true); ANALYZE public.import_users; COMMIT;
WHERE прямо в COPY
На загрузке можно отбрасывать строки по условию.
COPY public.events (id, payload, created_at) FROM STDIN WITH (FORMAT csv, HEADER true) WHERE id IS NOT NULL AND created_at >= DATE '2025-01-01';
Количество исключённых строк будет видно в pg_stat_progress_copy.
Безопасность
Если используете COPY ... FROM/TO 'filename' или PROGRAM, нужны привилегии суперпользователя или членство в ролях pg_read_server_files, pg_write_server_files, pg_execute_server_program.
Это осознанное ограничение: сервер получает доступ к файловой системе и shell. В PROGRAM команда запускается через оболочку, поэтому недоверенный ввод встраивать нельзя. Для таких сценариев держите фиксированные строки, белые списки и внимательное экранирование.
Ещё два момента. Для COPY TO путь обязан быть абсолютным. Для COPY FROM это рекомендация, но лучше не полагаться на рабочую директорию кластера. И не забываем про права на таблицы: для COPY TO нужно право SELECT на колонки, для COPY FROM право INSERT.
Кодировки и форматы дат
Если файл не в кодировке клиента, задаем ENCODING '...' в COPY. Для переносимой выгрузки я перед выгрузкой ставим ISO-формат дат:
SET DateStyle TO ISO, YMD; COPY (SELECT * FROM public.events WHERE created_at >= DATE '2025-01-01') TO STDOUT WITH (FORMAT csv, HEADER true, ENCODING 'UTF8');
На CSV все символы значимы, включая пробелы. Если источник дополняет строки пробелами до фиксированной ширины, чистим файл до загрузки. В CSV значение может включать переносы строк, если оно в кавычках.
COPY из кода: Python, Go, Rust
Python, psycopg3
Нормальная потоковая работа делается через cursor.copy(...). COPY не поддерживает параметры, поэтому SQL-строку формируем сами из белого списка идентификаторов.
import psycopg from psycopg.rows import dict_row def copy_csv_to_table(dsn: str, table: str, csv_path: str): if table not in {"public_events", "public_users"}: raise ValueError("table not allowed") copy_sql = f"COPY {table} FROM STDIN WITH (FORMAT csv, HEADER true)" with psycopg.connect(dsn, autocommit=False) as conn: with conn.cursor(row_factory=dict_row) as cur, open(csv_path, "rb") as f: with cur.copy(copy_sql) as cp: while chunk := f.read(256 * 1024): cp.write(chunk) conn.commit() def copy_table_to_csv(dsn: str, table: str, out_path: str): if table not in {"public_events", "public_users"}: raise ValueError("table not allowed") copy_sql = f"COPY {table} TO STDOUT WITH (FORMAT csv, HEADER true)" with psycopg.connect(dsn, autocommit=True) as conn: with conn.cursor() as cur, open(out_path, "wb") as f: with cur.copy(copy_sql) as cp: for data in cp: f.write(data) def copy_between(dsn_src: str, dsn_dst: str, src_query: str, dst_table: str): # перенос Postgres → Postgres в бинарном формате при совпадающих версиях sql_out = f"COPY ({src_query}) TO STDOUT WITH (FORMAT binary)" sql_in = f"COPY {dst_table} FROM STDIN WITH (FORMAT binary)" with psycopg.connect(dsn_src) as s, psycopg.connect(dsn_dst) as d: with s.cursor() as cs, d.cursor() as cd: with cs.copy(sql_out) as cp_out, cd.copy(sql_in) as cp_in: for data in cp_out: cp_in.write(data) d.commit()
Замечания по безопасности: никаких пользовательских фрагментов в строке COPY, только whitelisting таблиц и колонок. Пакуем данные крупными блоками, на маленьких кусках будет лишняя системная возня.
Go, pgx
В pgx есть CopyFrom, который забирает источник строк и шлёт их по протоколу COPY. Это удобнее, чем самому форматировать CSV, и быстрее, чем INSERT.
package bulk import ( "context" "encoding/csv" "io" "strconv" "github.com/jackc/pgx/v5" ) func CopyRows(ctx context.Context, conn *pgx.Conn, table string, columns []string, rows [][]any) (int64, error) { // columns вроде []string{"id", "payload", "created_at"} return conn.CopyFrom(ctx, pgx.Identifier{table}, columns, pgx.CopyFromRows(rows)) } func CopyCSV(ctx context.Context, conn *pgx.Conn, table string, columns []string, r io.Reader) (int64, error) { dec := csv.NewReader(r) // пропускаем заголовок if _, err := dec.Read(); err != nil { return 0, err } const batch = 5000 buf := make([][]any, 0, batch) var total int64 flush := func() error { if len(buf) == 0 { return nil } n, err := conn.CopyFrom(ctx, pgx.Identifier{table}, columns, pgx.CopyFromRows(buf)) total += n buf = buf[:0] return err } for { rec, err := dec.Read() if err == io.EOF { break } if err != nil { return total, err } id, _ := strconv.ParseInt(rec[0], 10, 64) row := []any{id, rec[1], rec[2]} // пример buf = append(buf, row) if len(buf) >= batch { if err := flush(); err != nil { return total, err } } } if err := flush(); err != nil { return total, err } return total, nil }
Здесь не генерируем CSV сами. pgx упакует всё в протокол COPY.
Rust, tokio-postgres
У клиента есть copy_out и copy_in. Работает потоками, без промежуточных файлов.
use tokio_postgres::{NoTls, Error}; use futures_util::{StreamExt, SinkExt}; use bytes::Bytes; pub async fn copy_out_csv(conn_str: &str, table: &str) -> Result<Vec<u8>, Error> { let (client, connection) = tokio_postgres::connect(conn_str, NoTls).await?; tokio::spawn(async move { let _ = connection.await; }); let sql = format!("COPY {} TO STDOUT WITH (FORMAT csv, HEADER true)", table); let mut stream = client.copy_out(&*sql).await?; let mut out = Vec::new(); while let Some(chunk) = stream.next().await { out.extend_from_slice(&chunk?); } Ok(out) } pub async fn copy_in_csv(conn_str: &str, table: &str, data: &[u8]) -> Result<(), Error> { let (client, connection) = tokio_postgres::connect(conn_str, NoTls).await?; tokio::spawn(async move { let _ = connection.await; }); let sql = format!("COPY {} FROM STDIN WITH (FORMAT csv, HEADER true)", table); let mut sink = client.copy_in(&*sql).await?; sink.send(Bytes::from(data.to_vec())).await?; sink.close().await?; Ok(()) }
COPY закрывает большинство задач массовых загрузок и выгрузок в Postgres. Если использовать его с оглядкой на права, форматы, кодировки и план обслуживания таблицы, получаем стабильные и быстрые пайплайны. В спорных местах проще выдержать staging, в горячих работать потоками без диска, в инфраструктурных переносах просто использовать бинарный формат при совпадающих версиях.
Приглашаем вас пройти вступительное тестирование по теме «Базы данных». Тестирование поможет объективно оценить текущий уровень знаний и навыков, необходимых для эффективного усвоения материала курса.
Курс «Базы данных» подробно рассматривает современные подходы к работе с системами управления базами данных, включая эффективные методы загрузки и выгрузки данных, управление таблицами и транзакциями, а также практическое применение SQL. Приглашаем вас присоединиться к обучению, чтобы получить системные знания и навыки в этой области.
ссылка на оригинал статьи https://habr.com/ru/articles/935454/
Добавить комментарий