Привет Хабр! Меня зовут Владимир и сегодня я буду развивать тему фишечки VectorChord про которую упомянул в предыдущей статье.
В данном материале я покажу, как поднять инфраструктуру с VectorChord, настроить VechordRegistry, написать пайплайны работы с БД, организовать гибридный поиск и добавить простейший реранкинг.
Поехали.
Настройка инфраструктуры
Начнем с настройки базы данных. Напишем docker-compose-dev.yml для развёртывания контейнера с базой данных
services: postgres: image: tensorchord/vchord-suite:pg18-latest environment: POSTGRES_DB: ${DB__NAME} POSTGRES_USER: ${DB__USER} POSTGRES_PASSWORD: ${DB__PASSWORD} volumes: - pgdata:/var/lib/postgresql ports: - "5432:5432" healthcheck: test: ["CMD-SHELL", "pg_isready -U ${DB__USER} -d ${DB__NAME}"] interval: 5s timeout: 5s retries: 5volumes: pgdata:
Проверим работоспособность. Поднимем контейнер с базой данных и увидим девственно чистую базу данных:

Как можно увидеть, мы не устанавливали в нашу базу данных никаких расширений. Это не склероз, а сделано специально, позже покажу почему.
Перед написанием основного кода скопируем из предыдущего проекта на pgvector настройки сервиса с одной доработкой — добавим к настройкам базы данных поле namespace. Оно пригодится в дальнейшем для таблиц:
class PGConfig(BaseModel): # предыдущий код namespace: str
Можно переходить к написанию кода. И первым на очереди у нас таблица данных для PostgreSQL.
Таблицы данных
Документы будем хранить в двух таблицах — одна для документа целиком, вторая — для чанков. В таблице с документами будем хранить исходные тексты. Это единица управления контентом. Таблица чанков — основа поисковой системы (документы слишком велики для поиска, поэтому их необходимо разбивать на части). Но начнём с третьей, базовой таблицы — в ней будут поля для фиксации времени создания/обновления записи и общее поле с метаданными.
# файл db_models/base.pyfrom datetime import datetime, timezonefrom functools import partialimport msgspecfrom psycopg.types.json import Jsonbfrom vechord import Tableclass BaseTable(Table, kw_only=True): metadata: Jsonb created_at: datetime = msgspec.field( default_factory=partial(datetime.now, timezone.utc)) updated_at: datetime = msgspec.field( default_factory=partial(datetime.now, timezone.utc))
Таблица наследуется от базового класса таблиц vechord.Table, которая, в свою очередь, наследуется от msgspec.Struct.
Краткая справка:
msgspec— это какPydantic, только без огромной экосистемы, зато в несколько раз быстрее.
Поля created_at / updated_at формируются автоматически при создании записи. Т.к. способа автоматически обновлять значение поля updated_at (как в sqlalchemy) я не нашел, поэтому будем делать вручную в коде метода обновления записи.
Официальный User Guide рекомендует отдельно определить DenseVector = Vector[emd_dim], не будем пренебрегать советом:
from core import settingsDenseVector = Vector[settings.db.embedding_dim]
Теперь таблица для документов:
# файл db_models/document.pyimport msgspecfrom vechord.spec import PrimaryKeyUUIDfrom .base import BaseTableclass Document(BaseTable, kw_only=True): uid: PrimaryKeyUUID = msgspec.field(default_factory=PrimaryKeyUUID.factory) title: str text: str
Для первичного ключа (если не хотим прописывать его каждый раз вручную) у vechord есть два базовых метода — vechord.spec.PrimaryKeyAutoIncrease и vechord.spec.PrimaryKeyUUID. Первый генерирует автоинкрементируемое поле целого типа, второй — UUID. Мне привычнее UUID, поэтому использую его. Ну и последняя таблица:
# файл db_models/doc_chunk.pyfrom vechord.spec import ForeignKey, Keyword, PrimaryKeyUUID, Vectorclass Chunk(BaseTable, kw_only=True): uid: PrimaryKeyUUID = msgspec.field(default_factory=PrimaryKeyUUID.factory) doc_id: Annotated[UUID, ForeignKey[Document.uid]] content: str content_tsv: Keyword embedding: DenseVector chunk_index: int
Поле content_tsv с типом vechord.spec.Keyword будет содержать данные для полнотекстового поиска, поле embedding — векторное представление текста из поля content. А у поля с типом ForeignKey есть одна особенность — он автоматически реализует ON DELETE CASCADE. Может быть у кого-то возникнет вопрос — “А почему поле uid не вынести в базовый класс?”. Попробуйте, у меня не вышло. Может быть я не очень разобрался в msgspec, но когда я размещал поле uid в базовой таблице, то ForeignKey[Document.uid] искал uid не в таблице Document, а в таблице BaseTable. Поэтому пришлось поле писать два раза.
Перейдём к способу создания подключения к БД и таблиц — классу VechordRegistry. Это прям швейцарский нож для базы данных. В классе есть клиент с AsyncConnectionPool, инструменты для создания таблиц, инструменты для реализации CRUD, инструменты разных видов поиска. Короче, всё что нужно в одном классе. Ну и использовать его можно как привычный асинхронный контекстный менеджер или замутить pipeline с помощью декоратора @vr.inject. Создадим этот замечательный объект в файле __init__.py
from vechord import VechordRegistryfrom core import settingsfrom .document import Documentfrom .doc_chunk import Chunkvr = VechordRegistry( namespace=settings.db.namespace, url=settings.db.db_url, tables=[Document, Chunk])
В качестве параметров в конструктор передаются namespace (используется как префикс в названиях таблиц, которые создаст наш Регистратор), url для доступа к базе данных и список необходимых таблиц. Ну а теперь к вопросу, почему я не делал инициализатор. Если покопаться в коде VechordRegistry, то можно найти вот такие интересные куски:
async def init_extension(self): async with ( await AsyncConnection.connect(self.url) as conn, conn.cursor() as cursor, ): await cursor.execute("CREATE EXTENSION IF NOT EXISTS vchord CASCADE") await cursor.execute("CREATE EXTENSION IF NOT EXISTS vchord_bm25") await cursor.execute("CREATE EXTENSION IF NOT EXISTS pg_tokenizer") await cursor.execute( 'SET search_path TO "$user", public, bm25_catalog, tokenizer_catalog' ) async def __aenter__(self): await self.init_extension()
Таким образом, при первом вызове async with vr: он сам установит все необходимые расширения. Ну а если продолжить изучать код, то найдём и методы для создания таблиц, и создание базовых токенизаторов (необходимы для полнотекстового поиска). Проверим как это работает. Напишем простенький main.py файл:
import asyncioimport sysfrom psycopg.types.json import Jsonbfrom db_models import Document, vrasync def main(): async with vr: doc = Document(title='Note', text='Some text', metadata=Jsonb({})) await vr.insert(doc) docs = await vr.select_by(Document.partial_init(), limit=1) print(docs)if __name__ == '__main__': if sys.platform == 'win32': asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) asyncio.run(main())
В методе мы создадим документ и сразу попытаемся его получить. Для записи / чтения базы используются собственные методы VechordRegistry. Для получения записей в метод VechordRegistry.select_by необходимо передать объект таблицы, который должен быть создан с помощью Table.partial_init(). Указанные в конструкторе значения будут использоваться для фильтрации. Посмотрим что с самой базой:

Как видим, VechordRegistry установил необходимые расширения, создал токенизаторы, таблицы и успешно добавил наш документ в БД. Перейдём к разработке сервиса-надстройки для VechordRegistry.
Сервис работы с документами
Хоть VechordRegistry сам по себе является абстракцией над голой базой данных, нам необходимо реализовать дополнительный слой абстракции. На нём, как минимум, необходимо реализовать создание и изменение чанков при соответствующих действиях с документом и метод гибридного поиска.
Начнём с создания документа. При вызове метода создания нам необходимо будет записать документ в базу данных, затем нарезать его на чанки и также сохранить из в базе. Для этой задачи идеально подойдёт vechord.registry.VechordPipeline. Как это работает: мы создаём несколько функций, выполняющих обработку данных. Каждую функцию необходимо продекорировать с помощью @vr.inject указав в качестве параметра входную (для извлечения данных из БД), выходную (для сохранения данных в БД) или обе таблицы. Далее мы создаём объект класса VechordPipeline, в который передадим список наших функций. Первая функция будет использоваться для получения входных данных, последняя — для возврата выходных. Остальные функции будут использоваться для промежуточной обработки данных. Функции будут выполняться в порядке их следования в списке, формируя конвейер обработки.
Напишем пайплайн создания документа. В нём будет два этапа — создание документа и создание чанков текста документа. Но начнем пайплайн с входной Pydantic схемы, которую в дальнейшем будем использовать в API:
# файл schemas/document.pyfrom typing import Annotated, Anyfrom pydantic import BaseModel, Fieldclass DocumentBase(BaseModel): title: Annotated[str, Field(..., description='Заголовок документа')] text: Annotated[str, Field(..., description='Содержимое документа')] meta_data: Annotated[ dict[str, Any], Field(default_factory=dict, description='Метаданные документа')]class DocumentCreate(DocumentBase): pass
Как и раньше, делаем базовую модель, от неё наследуем модель создания. Теперь перейдём к реализации шагов пайплайна. Из-за особенностей работы декоратора их нельзя поместить в тело класса — self будет считаться параметром, необходимым к получению из базы данных, а не ссылкой на объект класса. Поэтому вынесем шаги в отдельный файл. Начнём с create_doc:
# файл services/utils.pyfrom psycopg.types.json import Jsonbfrom db_models import Document, vrfrom schemas.document import DocumentCreate@vr.inject(output=Document)async def _create_document(doc_data: DocumentCreate) -> Document: doc = Document( title=doc_data.title, text=doc_data.text, metadata=Jsonb(doc_data.metadata) ) return doc
Декоратор @vr.inject с параметром output=Document автоматически сохранит созданый объект в БД. Теперь метод для создания чанков. С ним немного посложнее, т.к. для него нам необходимы инструменты для нарезания текста и получения эмбеддингов. Пока просто используем заглушки:
# файл services/utils.py@vr.inject(input=Document, output=Chunk)async def create_chunks(uid: UUID, text: str) -> list[Chunk]: chunks = await _chunker.segment(text) return [ Chunk( doc_id=uid, content=chunk, content_tsv=Keyword(chunk), embedding=DenseVector(await _embedder.vectorize_chunk(chunk)), metadata=Jsonb({}), chunk_index=i ) for i, chunk in enumerate(chunks, start=1) ]
В параметры декоратора @vr.inject передаём, что вход у нас это таблица Document, а выход — Chunk. В параметрах указываем именно класс таблицы, а не что возвращает метод, поэтому без list[]. Во входных параметрах функции необходимо указать имена полей таблицы Document которые хотим видеть в методе. Соберём пайплайн:
# файл services/document.pyfrom db_models import vrfrom .utils import create_chunks, create_documentclass DocumentService: async def create_document(self, doc_data: DocumentCreate): pipeline = vr.create_pipeline([self._create_document, self._create_chunks]) await pipeline.run(doc_data)
Готовенько. Время протестить. Но сначала определимся с чанкером и эмбеддером.
Для получения чанков в библиотеке vechord.chunk имеется три класса: RegexChunker, SpacyChunker и GeminiChunker. RegexChunker просто разбивает по определённому набору символов через регулярку, GeminiChunker требует API-ключ. Остаётся SpacyChunker на базе библиотеки spacy. Чтобы SpacyChunker заработал, надо доустановить библиотеку spacy и загрузить модель для чанкирования. Для русского языка доступно три модели: ru_core_news_sm, ru_core_news_md и ru_core_news_lg. Отличаются размером, точностью и скоростью работы. Возьмем среднюю:
uv add spacy==3.8.13python -m spacy download ru_core_news_md
Если не получается, можно скачать модель с гитхаба и установить напрямую
uv add <путь/к/файлу>
С эмбеддингами всё немного посложнее. В vechord.embedding есть эмбеддеры от Gemini, Jina, Voyage и даже OpenAI. Но у всех одна проблема — им нужен API-ключ. Есть класс SpacyDenseEmbedding, но тут даже сами разработчики предупреждают, что это не серьёзно. Но, т.к. весь этот материал — скорее эксперимент, чем production ready продукт, воспользуемся именно SpacyDenseEmbedding. Создадим экземпляры:
from vechord.chunk import SpacyChunkerfrom vechord.embedding import SpacyDenseEmbedding@lru_cachedef get_chunker() -> SpacyChunker: ch = SpacyChunker(model='ru_core_news_md') return ch@lru_cachedef get_embedder() -> SpacyDenseEmbedding: emb = SpacyDenseEmbedding(model='ru_core_news_md') return emb_chunker = get_chunker()_embedder = get_embedder()
Проверим работоспособность пайплайна. Перепишем наш main.py:
from services import DocumentServicefrom schemas.document import DocumentCreateasync def main(): async with vr: await DocumentService.create_document( DocumentCreate(title='Тестовый документ', text="""Длинный текст документа"""))
Смотрим результат

Отлично. Документ загрузился, поделился на чанки, для каждого чанка получен вектор и какие-то ключевые слова. Пайплайн работает. Однако в текущем виде он только пишет в БД. Хорошим тоном является возвращать созданный объект (ну или как минимум его первичный ключ). Реализуем. И как всегда начинаем со схемы ответа:
# файл schemas/document.pyclass DocumentResponse(DocumentBase): uid: Annotated[UUID, Field(..., description='Идентификатор документа')] created_at: Annotated[ datetime, Field(..., description='Дата и время создания документа')] updated_at: Annotated[ datetime, Field(..., description='Дата и время последнего обновления документа')] model_config = ConfigDict(from_attributes=True)
Схема есть. Теперь напишем финальный шаг для пайплайна создания:
# файл services/utils.py@vr.inject(input=Document)async def get_document(uid: UUID) -> DocumentResponse | None: docs = await vr.select_by(Document.partial_init(uid=uid)) return DocumentResponse.model_validate(docs[0]) if docs else None
Не понял, баг это или фича, но сразу получить объект класса Document нельзя — @vr.inject позволяет читать только конкретные поля таблицы, переданной в параметре input. Поэтому берём uid, вручную читаем объект с БД и возвращаем его (либо None, если что-то пошло не так).
Теперь обновление. В документации и коде явного update метода я не нашел. Поэтому лютый колхоз. Пишем update модель
# файл schemas/document.pyclass DocumentUpdate(BaseModel): title: Annotated[str | None, Field(None, description='Заголовок документа')] text: Annotated[str | None, Field(None, description='Содержимое документа')] metadata: Annotated[ dict[str, Any], Field(default_factory=dict, description='Метаданные документа')] def is_empty(self) -> bool: return all([ self.title is None, self.text is None, not self.metadata ])
Это та же модель создания документа, но все поля опциональны. Метод is_empty добавлен, чтобы проще валидировать наличие данных в модели. Теперь реализуем саму функцию update:
# файл services/utils.py@vr.inject(output=Document)async def update_document(uid: UUID, doc_data: DocumentUpdate) -> Document: docs = await vr.select_by(Document.partial_init(uid=uid)) if not docs: raise ValueError(f'Document {uid} not found') doc = docs[0] new_doc = Document( title=doc_data.title if doc_data.title is not None else doc.title, text=doc_data.text if doc_data.text is not None else doc.text, metadata=Jsonb(doc_data.metadata) if doc_data.metadata is not None else doc.metadata, updated_at=datetime.now(timezone.utc), created_at=doc.created_at, uid=doc.uid, ) await vr.remove_by(Document.partial_init(uid=uid)) return new_doc
При обновлении мы должны сохранить ID и дату создания. Поэтому, получаем документ, создаём новый документ с частями старого (время создания и uid) и удаляем старый. При удалении старого документа вместе с ним удалятся и его чанки (из-за ON DELETE CASCADE при объявлении поля vechord.spec.ForeignKey). Далее новый документ записывается в базу данных. Теперь пайплайн
class DocumentService: # предыдущий код @staticmethod async def update_document(uid: UUID, doc_data: DocumentUpdate) -> DocumentResponse | None: pipeline = vr.create_pipeline([update_document, create_chunks, get_document]) return await pipeline.run(uid=uid, doc_data=doc_data)
Дополним наш main.py строчкой:
# Где-то после создания документаawait DocumentService.update_document( uid=UUID(<UUID созданного документа>), doc_data=DocumentUpdate(title='Изменённый текстовый документ'))
Проверим:

Работает — время создания документа старое, а время обновления и чанки — новые. Добиваем наш сервис методами получения/удаления в виде простой прослойки и переходим к поиску
class DocumentService: # предыдущий код @staticmethod async def get_document(uid: UUID) -> DocumentResponse | None: docs = await vr.select_by(Document.partial_init(uid=uid)) return DocumentResponse.model_validate(docs[0]) if docs else None @staticmethod async def list_documents(limit: int | None = None) -> list[DocumentResponse]: docs = await vr.select_by(Document.partial_init(), limit=limit) return [DocumentResponse.model_validate(doc) for doc in docs] @staticmethod async def delete_document(uid: UUID) -> None: await vr.remove_by(Document.partial_init(uid=uid))
Сервис реализации поиска
По традиции, начнем реализацию поиска со схем:
# файл schemas/search.pyclass SearchBase(BaseModel): query: Annotated[str, Field(..., description='Поисковый запрос')] topk : Annotated[ int, Field(10, ge=1, description='Количество результатов для возврата')]class VectorSearchRequest(SearchBase): probe: Annotated[ int | None, Field(None, description='Сколько кластеров K-means нужно проверить')]class KeywordSearchRequest(SearchBase): pass
В схемах поисковых запросов просто повторяем параметры соответствующих методов VechordRegistry, за одним исключением — векторизовать запросы мы будем уже в нашем сервисе, тем же эмбеддером, которым получали векторы чанков. Поэтому поле запроса — строка.
Теперь схема возврата результата:
# файл schemas/search.pyclass ChunkResponse(BaseModel): uid: Annotated[UUID, Field(..., description='Идентификатор чанка')] doc_id: Annotated[ UUID, Field(..., description='Идентификатор документа')] content: Annotated[str, Field(..., description='Содержимое чанка')] created_at: Annotated[ datetime, Field(..., description='Дата и время создания чанка')] updated_at: Annotated[ datetime, Field(..., description='Дата и время последнего обновления чанка')] model_config = ConfigDict(from_attributes=True)
Теперь реализуем сервис поиска. Начнем реализацию с базовых методов поиска — векторного и полнотекстового:
# файл services/search.pyclass SearchService: @staticmethod async def vector_search(request: VectorSearchRequest) -> list[ChunkResponse]: vector = await _embedder.vectorize_chunk(request.query) results = await vr.search_by_vector( Chunk, vec=vector, topk=request.topk, probe=request.probe) return [ChunkResponse.model_validate(r) for r in results] @staticmethod async def keyword_search(query: KeywordSearchRequest) -> list[ChunkResponse]: results = await vr.search_by_keyword(Chunk, keyword=query.keyword, topk=query.topk) return [ChunkResponse.model_validate(r) for r in results]
Для векторного поиска сначала получаем вектор запроса. Ну а в остальном, методы просто передают параметры в методы класса VechordRegistry.
Ну и самое интересное — гибридный поиск
Гибридный поиск
Гибридный поиск обеспечивает высокую полноту выдачи (recall), объединяя семантический и ключевой поиск. Это позволяет находить документы как по смыслу, так и по точным совпадениям слов, компенсируя слабые стороны каждого метода в отдельности. Однако, методы первого этапа часто жертвуют точностью ради скорости. Здесь на помощь приходит реранкинг: финальный отбор лучших. Модель оценивает контекстное соответствие запроса и документа, исправляя ошибки первого этапа. Таким образом система сочетает скорость поиска по большим данным с точностью понимания смысла, выводя на первые позиции только то, что действительно нужно пользователю.
Библиотека vechord предоставляет свои реранкеры. Но, как и в случае с эмбеддерами, CohereReranker и JinaReranker требуют API ключи. Остаётся простейший ReciprocalRankFusion, названый так в честь используемого алгоритма. Название можно перевести как “Объединение обратных рангов”, и оно фактически описывает алгоритм — итоговый ранг документа формируется путём суммирования обратных величин его позиций в результатах поиска с добавлением константы для сглаживания влияния крайних значений. В итоге, чем выше документ в отдельных списках и чем чаще он встречается на верхних позициях, тем выше его итоговый ранг:
где — итоговый балл документа
,
— количество списков результатов,
— позиция документа
в
-м списке,
— константа (обычно 60).
Перейдём от теории к практике. Для начала реализуем модель запроса
class HybridSearchRequest(VectorSearchRequest, KeywordSearchRequest): boost: Annotated[ int, Field(3, ge=1, description='Коэффициент расширения выборки')]
Параметр boost нужен для повышения итоговой релевантности, т.к. если передать реранкеру ровно topk документов, то ему не из чего выбирать — он просто переставит местами уже отобранные документы, не улучшив качество выборки. Далее сам метод
class SearchService: # предыдущий код @staticmethod async def hybrid_search_fuse(request: HybridSearchRequest) -> list[ChunkResponse]: rrf = ReciprocalRankFusion() vector = await _embedder.vectorize_chunk(request.query) return rrf.fuse( [ await vr.search_by_vector( Chunk, vec=vector, topk=request.topk * request.boost, probe=request.probe), await vr.search_by_keyword(Chunk, keyword=request.keyword, topk=request.topk * request.boost) ] )[:request.topk]
С поиском как-бы всё.
API
API расписывать подробно не буду, т.к. фактически его копируем из репозитория (который git) для первой статьи и заменяем вызов методов репозитория (который паттерн) на вызов соответствующих методов сервиса. Отметить стоит, что из-за особенности VechordPipeline последний метод пайплайна возвращает список значений, поэтому в API для создания документа мы используем:
async def create_document(data: DocumentCreate): res = await DocumentService.create_document(data) return res[0]
Остался main.py. Он стандартный для FastAPI приложения, прокомментирую только метод lifespan:
@asynccontextmanagerasync def lifespan(app: FastAPI): async with vr: yield
В lifespan мы открываем пул соединений нашего VechordRegistry, который будет работать, пока работает приложение.
Внезапное окончание
Что имеем на текущий момент:
-
асинхронная работа с PostgreSQL через
VechordRegistry -
автоматическое разбиение документов на чанки (
SpacyChunker) и получение эмбеддингов (SpacyDenseEmbedding) -
полнотекстовый, векторный и гибридный поиск с реранкингом (
ReciprocalRankFusion) -
базовый API для документов с каскадным обновлением чанков
В текущей реализации эмбеддер и чанкер — игрушечные (из библиотеки spacy). Для боевого использования стоит заменить их на более серьёзные модели. В следующей части как раз этим и займемся — реализуем наследование от базовых классов, используя различные варианты.
Код проекта доступен тут.
ссылка на оригинал статьи https://habr.com/ru/articles/1024810/