PostgreSQL + VectorChord = Гибридный поиск. Часть 1. Инфраструктура

от автора

Привет Хабр! Меня зовут Владимир и сегодня я буду развивать тему фишечки VectorChord про которую упомянул в предыдущей статье.

В данном материале я покажу, как поднять инфраструктуру с VectorChord, настроить VechordRegistry, написать пайплайны работы с БД, организовать гибридный поиск и добавить простейший реранкинг.

Поехали.

Настройка инфраструктуры

Начнем с настройки базы данных. Напишем docker-compose-dev.yml для развёртывания контейнера с базой данных

services:  postgres:    image: tensorchord/vchord-suite:pg18-latest    environment:      POSTGRES_DB: ${DB__NAME}      POSTGRES_USER: ${DB__USER}      POSTGRES_PASSWORD: ${DB__PASSWORD}    volumes:      - pgdata:/var/lib/postgresql    ports:      - "5432:5432"    healthcheck:      test: ["CMD-SHELL", "pg_isready -U ${DB__USER} -d ${DB__NAME}"]      interval: 5s      timeout: 5s      retries: 5volumes:  pgdata:

Проверим работоспособность. Поднимем контейнер с базой данных и увидим девственно чистую базу данных:

Как можно увидеть, мы не устанавливали в нашу базу данных никаких расширений. Это не склероз, а сделано специально, позже покажу почему.

Перед написанием основного кода скопируем из предыдущего проекта на pgvector настройки сервиса с одной доработкой — добавим к настройкам базы данных поле namespace. Оно пригодится в дальнейшем для таблиц:

class PGConfig(BaseModel):    # предыдущий код    namespace: str

Можно переходить к написанию кода. И первым на очереди у нас таблица данных для PostgreSQL.

Таблицы данных

Документы будем хранить в двух таблицах — одна для документа целиком, вторая — для чанков. В таблице с документами будем хранить исходные тексты. Это единица управления контентом. Таблица чанков — основа поисковой системы (документы слишком велики для поиска, поэтому их необходимо разбивать на части). Но начнём с третьей, базовой таблицы — в ней будут поля для фиксации времени создания/обновления записи и общее поле с метаданными.

# файл db_models/base.pyfrom datetime import datetime, timezonefrom functools import partialimport msgspecfrom psycopg.types.json import Jsonbfrom vechord import Tableclass BaseTable(Table, kw_only=True):    metadata: Jsonb    created_at: datetime = msgspec.field(        default_factory=partial(datetime.now, timezone.utc))    updated_at: datetime = msgspec.field(        default_factory=partial(datetime.now, timezone.utc))

Таблица наследуется от базового класса таблиц vechord.Table, которая, в свою очередь, наследуется от msgspec.Struct.

Краткая справка: msgspec — это как Pydantic, только без огромной экосистемы, зато в несколько раз быстрее.

Поля created_at / updated_at формируются автоматически при создании записи. Т.к. способа автоматически обновлять значение поля updated_at (как в sqlalchemy) я не нашел, поэтому будем делать вручную в коде метода обновления записи.

Официальный User Guide рекомендует отдельно определить DenseVector = Vector[emd_dim], не будем пренебрегать советом:

from core import settingsDenseVector = Vector[settings.db.embedding_dim]

Теперь таблица для документов:

# файл db_models/document.pyimport msgspecfrom vechord.spec import PrimaryKeyUUIDfrom .base import BaseTableclass Document(BaseTable, kw_only=True):    uid: PrimaryKeyUUID = msgspec.field(default_factory=PrimaryKeyUUID.factory)    title: str    text: str

Для первичного ключа (если не хотим прописывать его каждый раз вручную) у vechord есть два базовых метода — vechord.spec.PrimaryKeyAutoIncrease и vechord.spec.PrimaryKeyUUID. Первый генерирует автоинкрементируемое поле целого типа, второй — UUID. Мне привычнее UUID, поэтому использую его. Ну и последняя таблица:

# файл db_models/doc_chunk.pyfrom vechord.spec import ForeignKey, Keyword, PrimaryKeyUUID, Vectorclass Chunk(BaseTable, kw_only=True):    uid: PrimaryKeyUUID = msgspec.field(default_factory=PrimaryKeyUUID.factory)    doc_id: Annotated[UUID, ForeignKey[Document.uid]]    content: str    content_tsv: Keyword    embedding: DenseVector    chunk_index: int

Поле content_tsv с типом vechord.spec.Keyword будет содержать данные для полнотекстового поиска, поле embedding — векторное представление текста из поля content. А у поля с типом ForeignKey есть одна особенность — он автоматически реализует ON DELETE CASCADE. Может быть у кого-то возникнет вопрос — “А почему поле uid не вынести в базовый класс?”. Попробуйте, у меня не вышло. Может быть я не очень разобрался в msgspec, но когда я размещал поле uid в базовой таблице, то ForeignKey[Document.uid] искал uid не в таблице Document, а в таблице BaseTable. Поэтому пришлось поле писать два раза.

Перейдём к способу создания подключения к БД и таблиц — классу VechordRegistry. Это прям швейцарский нож для базы данных. В классе есть клиент с AsyncConnectionPool, инструменты для создания таблиц, инструменты для реализации CRUD, инструменты разных видов поиска. Короче, всё что нужно в одном классе. Ну и использовать его можно как привычный асинхронный контекстный менеджер или замутить pipeline с помощью декоратора @vr.inject. Создадим этот замечательный объект в файле __init__.py

from vechord import VechordRegistryfrom core import settingsfrom .document import Documentfrom .doc_chunk import Chunkvr = VechordRegistry(    namespace=settings.db.namespace,    url=settings.db.db_url,    tables=[Document, Chunk])

В качестве параметров в конструктор передаются namespace (используется как префикс в названиях таблиц, которые создаст наш Регистратор), url для доступа к базе данных и список необходимых таблиц. Ну а теперь к вопросу, почему я не делал инициализатор. Если покопаться в коде VechordRegistry, то можно найти вот такие интересные куски:

    async def init_extension(self):        async with (            await AsyncConnection.connect(self.url) as conn,            conn.cursor() as cursor,        ):            await cursor.execute("CREATE EXTENSION IF NOT EXISTS vchord CASCADE")            await cursor.execute("CREATE EXTENSION IF NOT EXISTS vchord_bm25")            await cursor.execute("CREATE EXTENSION IF NOT EXISTS pg_tokenizer")            await cursor.execute(                'SET search_path TO "$user", public, bm25_catalog, tokenizer_catalog'            )    async def __aenter__(self):        await self.init_extension()

Таким образом, при первом вызове async with vr: он сам установит все необходимые расширения. Ну а если продолжить изучать код, то найдём и методы для создания таблиц, и создание базовых токенизаторов (необходимы для полнотекстового поиска). Проверим как это работает. Напишем простенький main.py файл:

import asyncioimport sysfrom psycopg.types.json import Jsonbfrom db_models import Document, vrasync def main():    async with vr:        doc = Document(title='Note', text='Some text', metadata=Jsonb({}))        await vr.insert(doc)        docs = await vr.select_by(Document.partial_init(), limit=1)        print(docs)if __name__ == '__main__':    if sys.platform == 'win32':        asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())    asyncio.run(main())

В методе мы создадим документ и сразу попытаемся его получить. Для записи / чтения базы используются собственные методы VechordRegistry. Для получения записей в метод VechordRegistry.select_by необходимо передать объект таблицы, который должен быть создан с помощью Table.partial_init(). Указанные в конструкторе значения будут использоваться для фильтрации. Посмотрим что с самой базой:

Как видим, VechordRegistry установил необходимые расширения, создал токенизаторы, таблицы и успешно добавил наш документ в БД. Перейдём к разработке сервиса-надстройки для VechordRegistry.

Сервис работы с документами

Хоть VechordRegistry сам по себе является абстракцией над голой базой данных, нам необходимо реализовать дополнительный слой абстракции. На нём, как минимум, необходимо реализовать создание и изменение чанков при соответствующих действиях с документом и метод гибридного поиска.

Начнём с создания документа. При вызове метода создания нам необходимо будет записать документ в базу данных, затем нарезать его на чанки и также сохранить из в базе. Для этой задачи идеально подойдёт vechord.registry.VechordPipeline. Как это работает: мы создаём несколько функций, выполняющих обработку данных. Каждую функцию необходимо продекорировать с помощью @vr.inject указав в качестве параметра входную (для извлечения данных из БД), выходную (для сохранения данных в БД) или обе таблицы. Далее мы создаём объект класса VechordPipeline, в который передадим список наших функций. Первая функция будет использоваться для получения входных данных, последняя — для возврата выходных. Остальные функции будут использоваться для промежуточной обработки данных. Функции будут выполняться в порядке их следования в списке, формируя конвейер обработки.

Напишем пайплайн создания документа. В нём будет два этапа — создание документа и создание чанков текста документа. Но начнем пайплайн с входной Pydantic схемы, которую в дальнейшем будем использовать в API:

# файл schemas/document.pyfrom typing import Annotated, Anyfrom pydantic import BaseModel, Fieldclass DocumentBase(BaseModel):    title: Annotated[str, Field(..., description='Заголовок документа')]    text: Annotated[str, Field(..., description='Содержимое документа')]    meta_data: Annotated[        dict[str, Any],        Field(default_factory=dict, description='Метаданные документа')]class DocumentCreate(DocumentBase):    pass

Как и раньше, делаем базовую модель, от неё наследуем модель создания. Теперь перейдём к реализации шагов пайплайна. Из-за особенностей работы декоратора их нельзя поместить в тело класса — self будет считаться параметром, необходимым к получению из базы данных, а не ссылкой на объект класса. Поэтому вынесем шаги в отдельный файл. Начнём с create_doc:

# файл services/utils.pyfrom psycopg.types.json import Jsonbfrom db_models import Document, vrfrom schemas.document import DocumentCreate@vr.inject(output=Document)async def _create_document(doc_data: DocumentCreate) -> Document:    doc = Document(        title=doc_data.title,        text=doc_data.text,        metadata=Jsonb(doc_data.metadata)    )    return doc

Декоратор @vr.inject с параметром output=Document автоматически сохранит созданый объект в БД. Теперь метод для создания чанков. С ним немного посложнее, т.к. для него нам необходимы инструменты для нарезания текста и получения эмбеддингов. Пока просто используем заглушки:

# файл services/utils.py@vr.inject(input=Document, output=Chunk)async def create_chunks(uid: UUID, text: str) -> list[Chunk]:    chunks = await _chunker.segment(text)    return [        Chunk(            doc_id=uid,            content=chunk,            content_tsv=Keyword(chunk),            embedding=DenseVector(await _embedder.vectorize_chunk(chunk)),            metadata=Jsonb({}),            chunk_index=i        )        for i, chunk in enumerate(chunks, start=1)    ]

В параметры декоратора @vr.inject передаём, что вход у нас это таблица Document, а выход — Chunk. В параметрах указываем именно класс таблицы, а не что возвращает метод, поэтому без list[]. Во входных параметрах функции необходимо указать имена полей таблицы Document которые хотим видеть в методе. Соберём пайплайн:

# файл services/document.pyfrom db_models import vrfrom .utils import create_chunks, create_documentclass DocumentService:    async def create_document(self, doc_data: DocumentCreate):        pipeline = vr.create_pipeline([self._create_document, self._create_chunks])        await pipeline.run(doc_data)

Готовенько. Время протестить. Но сначала определимся с чанкером и эмбеддером.

Для получения чанков в библиотеке vechord.chunk имеется три класса: RegexChunker, SpacyChunker и GeminiChunker. RegexChunker просто разбивает по определённому набору символов через регулярку, GeminiChunker требует API-ключ. Остаётся SpacyChunker на базе библиотеки spacy. Чтобы SpacyChunker заработал, надо доустановить библиотеку spacy и загрузить модель для чанкирования. Для русского языка доступно три модели: ru_core_news_sm, ru_core_news_md и ru_core_news_lg. Отличаются размером, точностью и скоростью работы. Возьмем среднюю:

uv add spacy==3.8.13python -m spacy download ru_core_news_md

Если не получается, можно скачать модель с гитхаба и установить напрямую

uv add <путь/к/файлу>

С эмбеддингами всё немного посложнее. В vechord.embedding есть эмбеддеры от Gemini, Jina, Voyage и даже OpenAI. Но у всех одна проблема — им нужен API-ключ. Есть класс SpacyDenseEmbedding, но тут даже сами разработчики предупреждают, что это не серьёзно. Но, т.к. весь этот материал — скорее эксперимент, чем production ready продукт, воспользуемся именно SpacyDenseEmbedding. Создадим экземпляры:

from vechord.chunk import SpacyChunkerfrom vechord.embedding import SpacyDenseEmbedding@lru_cachedef get_chunker() -> SpacyChunker:    ch = SpacyChunker(model='ru_core_news_md')    return ch@lru_cachedef get_embedder() -> SpacyDenseEmbedding:    emb = SpacyDenseEmbedding(model='ru_core_news_md')    return emb_chunker = get_chunker()_embedder = get_embedder()

Проверим работоспособность пайплайна. Перепишем наш main.py:

from services import DocumentServicefrom schemas.document import DocumentCreateasync def main():    async with vr:        await DocumentService.create_document(            DocumentCreate(title='Тестовый документ', text="""Длинный текст документа"""))

Смотрим результат

Отлично. Документ загрузился, поделился на чанки, для каждого чанка получен вектор и какие-то ключевые слова. Пайплайн работает. Однако в текущем виде он только пишет в БД. Хорошим тоном является возвращать созданный объект (ну или как минимум его первичный ключ). Реализуем. И как всегда начинаем со схемы ответа:

# файл schemas/document.pyclass DocumentResponse(DocumentBase):    uid: Annotated[UUID, Field(..., description='Идентификатор документа')]    created_at: Annotated[        datetime, Field(..., description='Дата и время создания документа')]    updated_at: Annotated[        datetime,        Field(..., description='Дата и время последнего обновления документа')]    model_config = ConfigDict(from_attributes=True)

Схема есть. Теперь напишем финальный шаг для пайплайна создания:

# файл services/utils.py@vr.inject(input=Document)async def get_document(uid: UUID) -> DocumentResponse | None:    docs = await vr.select_by(Document.partial_init(uid=uid))    return DocumentResponse.model_validate(docs[0]) if docs else None

Не понял, баг это или фича, но сразу получить объект класса Document нельзя — @vr.inject позволяет читать только конкретные поля таблицы, переданной в параметре input. Поэтому берём uid, вручную читаем объект с БД и возвращаем его (либо None, если что-то пошло не так).

Теперь обновление. В документации и коде явного update метода я не нашел. Поэтому лютый колхоз. Пишем update модель

# файл schemas/document.pyclass DocumentUpdate(BaseModel):    title: Annotated[str | None, Field(None, description='Заголовок документа')]    text: Annotated[str | None, Field(None, description='Содержимое документа')]    metadata: Annotated[        dict[str, Any],        Field(default_factory=dict, description='Метаданные документа')]    def is_empty(self) -> bool:        return all([            self.title is None,            self.text is None,            not self.metadata        ])

Это та же модель создания документа, но все поля опциональны. Метод is_empty добавлен, чтобы проще валидировать наличие данных в модели. Теперь реализуем саму функцию update:

# файл services/utils.py@vr.inject(output=Document)async def update_document(uid: UUID, doc_data: DocumentUpdate) -> Document:    docs = await vr.select_by(Document.partial_init(uid=uid))    if not docs:        raise ValueError(f'Document {uid} not found')    doc = docs[0]    new_doc = Document(        title=doc_data.title if doc_data.title is not None else doc.title,        text=doc_data.text if doc_data.text is not None else doc.text,        metadata=Jsonb(doc_data.metadata) if doc_data.metadata is not None else doc.metadata,        updated_at=datetime.now(timezone.utc),        created_at=doc.created_at,        uid=doc.uid,    )    await vr.remove_by(Document.partial_init(uid=uid))    return new_doc

При обновлении мы должны сохранить ID и дату создания. Поэтому, получаем документ, создаём новый документ с частями старого (время создания и uid) и удаляем старый. При удалении старого документа вместе с ним удалятся и его чанки (из-за ON DELETE CASCADE при объявлении поля vechord.spec.ForeignKey). Далее новый документ записывается в базу данных. Теперь пайплайн

class DocumentService:    # предыдущий код    @staticmethod    async def update_document(uid: UUID, doc_data: DocumentUpdate) -> DocumentResponse | None:        pipeline = vr.create_pipeline([update_document, create_chunks, get_document])        return await pipeline.run(uid=uid, doc_data=doc_data)

Дополним наш main.py строчкой:

# Где-то после создания документаawait DocumentService.update_document(    uid=UUID(<UUID созданного документа>), doc_data=DocumentUpdate(title='Изменённый текстовый документ'))

Проверим:

Работает — время создания документа старое, а время обновления и чанки — новые. Добиваем наш сервис методами получения/удаления в виде простой прослойки и переходим к поиску

class DocumentService:    # предыдущий код    @staticmethod    async def get_document(uid: UUID) -> DocumentResponse | None:        docs = await vr.select_by(Document.partial_init(uid=uid))        return DocumentResponse.model_validate(docs[0]) if docs else None    @staticmethod    async def list_documents(limit: int | None = None) -> list[DocumentResponse]:        docs = await vr.select_by(Document.partial_init(), limit=limit)        return [DocumentResponse.model_validate(doc) for doc in docs]    @staticmethod    async def delete_document(uid: UUID) -> None:        await vr.remove_by(Document.partial_init(uid=uid))

Сервис реализации поиска

По традиции, начнем реализацию поиска со схем:

# файл schemas/search.pyclass SearchBase(BaseModel):    query: Annotated[str, Field(..., description='Поисковый запрос')]    topk : Annotated[        int, Field(10, ge=1, description='Количество результатов для возврата')]class VectorSearchRequest(SearchBase):    probe: Annotated[        int | None, Field(None, description='Сколько кластеров K-means нужно проверить')]class KeywordSearchRequest(SearchBase):    pass

В схемах поисковых запросов просто повторяем параметры соответствующих методов VechordRegistry, за одним исключением — векторизовать запросы мы будем уже в нашем сервисе, тем же эмбеддером, которым получали векторы чанков. Поэтому поле запроса — строка.

Теперь схема возврата результата:

# файл schemas/search.pyclass ChunkResponse(BaseModel):    uid: Annotated[UUID, Field(..., description='Идентификатор чанка')]    doc_id: Annotated[        UUID, Field(..., description='Идентификатор документа')]    content: Annotated[str, Field(..., description='Содержимое чанка')]    created_at: Annotated[        datetime, Field(..., description='Дата и время создания чанка')]    updated_at: Annotated[        datetime,        Field(..., description='Дата и время последнего обновления чанка')]    model_config = ConfigDict(from_attributes=True)

Теперь реализуем сервис поиска. Начнем реализацию с базовых методов поиска — векторного и полнотекстового:

# файл services/search.pyclass SearchService:    @staticmethod    async def vector_search(request: VectorSearchRequest) -> list[ChunkResponse]:        vector = await _embedder.vectorize_chunk(request.query)        results = await vr.search_by_vector(            Chunk, vec=vector, topk=request.topk, probe=request.probe)        return [ChunkResponse.model_validate(r) for r in results]    @staticmethod    async def keyword_search(query: KeywordSearchRequest) -> list[ChunkResponse]:        results = await vr.search_by_keyword(Chunk, keyword=query.keyword, topk=query.topk)        return [ChunkResponse.model_validate(r) for r in results]

Для векторного поиска сначала получаем вектор запроса. Ну а в остальном, методы просто передают параметры в методы класса VechordRegistry.

Ну и самое интересное — гибридный поиск

Гибридный поиск

Гибридный поиск обеспечивает высокую полноту выдачи (recall), объединяя семантический и ключевой поиск. Это позволяет находить документы как по смыслу, так и по точным совпадениям слов, компенсируя слабые стороны каждого метода в отдельности. Однако, методы первого этапа часто жертвуют точностью ради скорости. Здесь на помощь приходит реранкинг: финальный отбор лучших. Модель оценивает контекстное соответствие запроса и документа, исправляя ошибки первого этапа. Таким образом система сочетает скорость поиска по большим данным с точностью понимания смысла, выводя на первые позиции только то, что действительно нужно пользователю.

Библиотека vechord предоставляет свои реранкеры. Но, как и в случае с эмбеддерами, CohereReranker и JinaReranker требуют API ключи. Остаётся простейший ReciprocalRankFusion, названый так в честь используемого алгоритма. Название можно перевести как “Объединение обратных рангов”, и оно фактически описывает алгоритм — итоговый ранг документа формируется путём суммирования обратных величин его позиций в результатах поиска с добавлением константы для сглаживания влияния крайних значений. В итоге, чем выше документ в отдельных списках и чем чаще он встречается на верхних позициях, тем выше его итоговый ранг:

\text{score}(d) = \sum_{i=1}^{n} \frac{1}{k + \text{rank}_i(d)}

где \text{score}(d) — итоговый балл документа d, n — количество списков результатов, \text{rank}_i(d) — позиция документа d в i-м списке, k — константа (обычно 60).

Перейдём от теории к практике. Для начала реализуем модель запроса

class HybridSearchRequest(VectorSearchRequest, KeywordSearchRequest):    boost: Annotated[        int, Field(3, ge=1, description='Коэффициент расширения выборки')]

Параметр boost нужен для повышения итоговой релевантности, т.к. если передать реранкеру ровно topk документов, то ему не из чего выбирать — он просто переставит местами уже отобранные документы, не улучшив качество выборки. Далее сам метод

class SearchService:    # предыдущий код    @staticmethod    async def hybrid_search_fuse(request: HybridSearchRequest) -> list[ChunkResponse]:        rrf = ReciprocalRankFusion()        vector = await _embedder.vectorize_chunk(request.query)        return rrf.fuse(            [                await vr.search_by_vector(                    Chunk, vec=vector, topk=request.topk * request.boost, probe=request.probe),                await vr.search_by_keyword(Chunk, keyword=request.keyword, topk=request.topk * request.boost)            ]        )[:request.topk]

С поиском как-бы всё.

API

API расписывать подробно не буду, т.к. фактически его копируем из репозитория (который git) для первой статьи и заменяем вызов методов репозитория (который паттерн) на вызов соответствующих методов сервиса. Отметить стоит, что из-за особенности VechordPipeline последний метод пайплайна возвращает список значений, поэтому в API для создания документа мы используем:

async def create_document(data: DocumentCreate):    res = await DocumentService.create_document(data)    return res[0]

Остался main.py. Он стандартный для FastAPI приложения, прокомментирую только метод lifespan:

@asynccontextmanagerasync def lifespan(app: FastAPI):    async with vr:        yield

В lifespan мы открываем пул соединений нашего VechordRegistry, который будет работать, пока работает приложение.

Внезапное окончание

Что имеем на текущий момент:

  • асинхронная работа с PostgreSQL через VechordRegistry

  • автоматическое разбиение документов на чанки (SpacyChunker) и получение эмбеддингов (SpacyDenseEmbedding)

  • полнотекстовый, векторный и гибридный поиск с реранкингом (ReciprocalRankFusion)

  • базовый API для документов с каскадным обновлением чанков

В текущей реализации эмбеддер и чанкер — игрушечные (из библиотеки spacy). Для боевого использования стоит заменить их на более серьёзные модели. В следующей части как раз этим и займемся — реализуем наследование от базовых классов, используя различные варианты.

Код проекта доступен тут.

ссылка на оригинал статьи https://habr.com/ru/articles/1024810/