Алгоритмы векторного поиска: IVF и HNSW

от автора

О чем эта статья?

В данной статье я хочу пройтись по двум самым популярным алгоритмам векторного поиска, используемым на практике. Попробуем понять, почему точный поиск не работает в высоких размерностях и почему мы в итоге приходим к приближенному поиску.

Заодно мы затронем тему метрик, чтобы понять, как вообще сравнивают эмбеддинги. Рассмотрим вспомогательный и очень простой алгоритм k-means из классического ML’а, лежащий в основе IVF.

И наконец, подробно разберем два самых главных алгоритма IVF и HNSW с примерами их реализации на Python’е.

Curse of Dimensionality

Введение

Для начала нам нужно понять, почему мы не можем просто взять и воспользоваться каким-нибудь стандартным алгоритмом вроде kd-tree для точного поиска и почему мы в итоге приходим к приближенному поиску. Оказывается, все дело в штуке, которая называется проклятием размерности (curse of dimensionality).

С ростом числа измерений пространство растет экспоненциально в том смысле, что для сохранения той же плотности точек требуется экспоненциально больше данных. Это означает, что данные становятся более разреженными. А это в свою очередь приводит к тому, что контраст между расстояниями начинает размываться с увеличением числа размерностей. В высоких размерностях расстояние до ближайшей и самой далекой точки начинает становиться почти одинаковым. Это явление называется concentration of distances.

\frac{d_{max} - d_{min}}{d_{min}} \to 0

Интуиция

Представим, что у нас постепенно растет число измерений:

  • 1D — мы заполняем точками одну прямую

  • 2D — мы заполняем точками квадрат

  • 3D — мы заполняем точками куб

В общем случае, в d-мерном пространстве, у нас гиперкуб. С увеличением размерности нам нужно все больше и больше точек для заполнения этого пространства.

К каким проблемам это приводит?

  1. Data sparsity. Данные становятся сильно разреженными, то есть, большая часть пространства остается пустой. Это усложняет задачи кластеризации, классификации и поиска.

  2. Increased computation. Большее число измерений требует больше вычислений и времени на обработку данных.

  3. Overfitting. Модели могут начинать подстраиваться под шум в данных, а не под реальную закономерность. Это снижает способность модели обобщать результаты на новые данные.

  4. Distances lose meaning. Различия между расстояниями точек данных становятся незначительными, из-за чего такие метрики, как евклидово расстояние, теряют информативность.

  5. Performance degradation. Алгоритмы, которые опираются на измерение расстояний (например, k-means), могут работать хуже.

В зависимости от задачи, мы можем по-разному подходить к решению данных проблем, начиная от классического PCA, заканчивая автоэнкодерами.

Но здесь мы рассмотрим проблемы, возникающие в задачах поиска в векторном пространстве.

Exact Nearest Neighbor

Exact nearest neighbor search пытается гарантированно найти самый близкий объект. Чтобы сделать это быстро, индекс должен уметь надежно отсекать большие части пространства (pruning).

Но в высоких размерностях:

  • надежно отсечь становится трудно

  • слишком много кандидатов выглядят почти одинаково

  • в итоге exact indexes теряют эффективность

Например, kd-tree пытается сказать:

  • все точки в этом поддереве слишком далеко

  • в эту ветвь дерева можно даже не смотреть

Но, в высоких размерностях:

  • bounding regions начинают сильно пересекаться

  • чтобы не потерять правильный ответ, приходится проверять слишком много ветвей

  • индекс по факту начинает вести себя почти как полный перебор

Approximate Nearest Neighbor

Вместо гарантии идеального ответа мы соглашаемся на очень хороший approximate answer, нам достаточно с высокой вероятностью найти очень хороших кандидатов.

Теперь индексу не нужно доказывать:

  • что все отброшенное точно бесполезно

Ему достаточно:

  • быстро найти область, где скорее всего лежат хорошие соседи

То есть exact pruning заменяется на heuristic candidate generation.

Метрики (L2, Inner Product, Cosine Similarity)

Введение

Мы поняли, что нужно искать вектора приближенными методами. Теперь нам нужно понять, как именно мы эти вектора будем сравнивать между собой. Тут нужно понимать, что у нас не просто вектора, а эмбеддинги, несущие в себе смысловую нагрузку. А ведь нам нужно искать максимально близкие по смыслу вектора.

В качестве меры схожести эмбеддингов чаще всего используют следующие метрики:

  • L2

  • Inner Product

  • Cosine Similarity

Выбор метрики зависит от того, какую геометрию пространства формирует модель эмбеддингов во время обучения. Например, для текстовых эмбеддингов чаще всего обычно используют Cosine Similarity, эта метрика зависит только от направления вектора и игнорирует его длину. А для каких-нибудь рекомендательных систем модель могут обучать так, что длина вектора тоже несет полезный сигнал, например, силу или уверенность предсказания.

L2 — евклидово расстояние

d(x, y) = \sqrt{\sum_{\substack{i}}(x_i - y_i)^2}

иногда используется squared L2

d(x, y) = \sum_{\substack{i}}(x_i - y_i)^2

потому что она дает тот же порядок ближайших соседей, но при этом вычисляется быстрее.

​Интуиция

L2 отвечает на вопрос:

“Насколько далеко находятся две точки в пространстве?”

Чем меньше расстояние, тем объекты ближе.

Что важно

На L2 влияют сразу две вещи:

  • направление

  • длина вектора

То есть даже если два вектора смотрят в одну сторону, но один намного длиннее другого, L2 может быть большим.

Inner Product

x \cdot y = \sum_{\substack{i}}(x_i y_i)

Интуиция

Inner Product большой, когда:

  • векторы направлены в похожую сторону

  • у них большие длины

То есть это не совсем “расстояние”. Это скорее мера согласованности двух векторов.

Что важно

Inner Product чувствителен к норме вектора.

Если один вектор просто “длиннее”, Inner Product может стать больше даже при том же направлении.

Cosine Similarity

\cos(x, y) = \frac{x \cdot y}{\lvert x \rvert \lvert y \rvert}

Интуиция

Cosine Similarity отвечает на вопрос:

“Насколько векторы смотрят в одну сторону?”

Она игнорирует длину и смотрит только на направление.

Диапазон

Обычно:

  • 1 — полностью совпадающее направление

  • 0 — ортогональны

  • -1 — полностью противоположные направления

На практике в эмбеддингах часто значения в основном положительные или около нуля, но не всегда.

Что важно

Cosine Similarity не зависит от масштаба.

Если, например, умножить вектор на 10, Cosine Similarity с другим вектором не изменится.

Связь между Cosine Similarity и Inner Product

Если все векторы нормализованы к единичной длине:

\lvert x \rvert = \lvert y \rvert = 1

тогда:

x \cdot y = \cos(x, y)

То есть Inner Product и Cosine Similarity становятся эквивалентны.

Связь между Cosine Similarity и L2

Если векторы нормализованы, между Cosine Similarity и L2 тоже есть простая связь.

\lvert x - y \rvert^2 = 2 - 2\cos(x, y)

Это значит:

  • чем больше косинус

  • тем меньше L2 distance

и наоборот.

Геометрическая интерпретация

Представим, что все векторы после нормализации лежат на поверхности единичной гиперсферы.

Тогда:

  • Cosine Similarity — это “насколько маленький угол между ними”

  • Inner Product — то же самое, потому что длины уже равны 1

  • L2 distance — это длина хорды между двумя точками на сфере

А на гиперсфере:

  • маленький угол

  • большой cosine

  • маленькая хорда

все это описывает одну и ту же близость, просто в разных координатах.

k-means

Введение

Прежде чем мы приступим к разбору IVF, нам нужно разобраться в k-means, потому что он по сути является базовым кирпичиком на котором строится IVF и без понимания того, как работает k-means будет сложно разобраться в IVF.

Данный алгоритм применяется для кластеризации данных в задаче обучения без учителя (unsupervised learning). На вход алгоритму подаются неразмеченные данные (unlabeled data), а он должен попытаться сгруппировать их в k заранее заданное количество кластеров. В качестве метрики схожести чаще всего используется евклидово расстояние.

Описание алгоритма

Сам алгоритм, довольно простой, вот описание шагов, из которых оно состоит:

  1. Мы выбираем случайные k точек из набора входных данных. Это будут нашими начальными центроидами.

  2. Пробегаемся по всем точкам, смотрим, к какой из центроид она ближе всего. В итоге у нас получится k кластеров.

  3. Внутри каждого кластера выбираем новую центроиду, вычисляя mean точку из набора точек данного кластера.

  4. Заново пробегаемся по всем точкам и снова определяем, к какому кластеру оно теперь относится.

  5. Если у нас за итерацию не было изменений, значит алгоритм сошелся и мы заканчиваем. Либо, мы достигли заранее определенного лимита итераций.

Реализация алгоритма на Python
import mathimport randomfrom dataclasses import dataclassMAX_STEPS = 100@dataclassclass Point2D:    x: float    y: floatdef distance(point_1: Point2D, point_2: Point2D) -> float:    return math.sqrt((point_1.x - point_2.x) ** 2 + (point_1.y - point_2.y) ** 2)def assign_points(points: list[Point2D], centroids: dict[int, Point2D]) -> list[int]:    clusters = [0 for _ in range(len(points))]    for p_index in range(len(points)):        min_dist    = None        min_cluster = None        for cluster, centroid in centroids.items():            dist = distance(points[p_index], centroid)            if min_dist is None or dist < min_dist:                min_dist    = dist                min_cluster = cluster        clusters[p_index] = min_cluster    return clustersdef update_centroids(points: list[Point2D], clusters: list[int], centroids: dict[int, Point2D]) -> dict[int, Point2D]:    updated_centroids = {}    for cluster, _ in centroids.items():        mean_x = 0        mean_y = 0        mean_n = 0        for p_index in range(len(points)):            if cluster == clusters[p_index]:                mean_x += points[p_index].x                mean_y += points[p_index].y                mean_n += 1        if mean_n > 0:            updated_centroids[cluster] = Point2D(mean_x / mean_n, mean_y / mean_n)        else:            updated_centroids[cluster] = centroids[cluster]    return updated_centroidsdef k_means(k: int, points: list[Point2D]) -> tuple[dict[int, Point2D], list[int]]:    clusters = [0 for _ in range(len(points))]    # initialize centroids    centroids = {cluster: points[p_index] for cluster, p_index in enumerate(random.sample(range(len(points)), k), 1)}    step = 0    while step < MAX_STEPS:        updated_clusters = assign_points(points, centroids)        if clusters == updated_clusters:            return centroids, updated_clusters        clusters  = updated_clusters        centroids = update_centroids(points, clusters, centroids)        step += 1    return centroids, clusterspoints = [Point2D(random.random(), random.random()) for _ in range(100)]centroids, clusters = k_means(k=10, points=points)print('Clusters:', clusters)

Inverted File Index (IVF)

Введение

IVF (Inverted File Index) — это один из самых популярных и простых алгоритмов приближенного поиска ближайших соседей (ANN) для векторных баз данных, таких как Qdrant, PGVector, а также это один из основных алгоритмов, используемых в популярной библиотеке FAISS. Он ускоряет поиск, разбивая пространство векторов на кластеры и выполняя поиск только в наиболее релевантных кластерах, а не во всей базе.

Описание алгоритма

Общая схема работы алгоритма у нас будет состоять из двух этапов:

  1. Построение IVF индекса.

    1.1. Запускаем на всей базе векторов алгоритм k-means и получаем nlist кластеров.

    1.2. Каждый вектор назначается ближайшей центроиде.

    cluster_1 -> [vector_3, vector_1, vector_17, ...]cluster_2 -> [vector_2, vector_5, vector_15, ...]cluster_3 -> [vector_4, vector_9, vector_23, ...]...

    каждый такой список векторов называется inverted list, а совокупность этих списков образует inverted file.

  2. Поиск top_k ближайших векторов до вектора query.

    2.1. Допустим, теперь нам на вход дают вектор query и нам нужно найти top_k ближайших до него векторов.

    2.2. Находим nprobe ближайших центроид до вектора query. Таким образом, у нас теперь nprobe кластеров для дальнейшего поиска.

    2.3. Ищем среди найденных nprobe кластеров top_k ближайшие вектора до вектора query.

    2.4. Возвращаем эти top_k ближайшие вектора в отсортированном по расстоянию до вектора query порядке.

Ключевые параметры

nlist — общее количество кластеров.

nprobe — количество кластеров, в которых мы производим наш поиск.

Увеличение nprobe повышает recall, но делает поиск медленнее.

Реализация алгоритма на Python
import mathimport randomfrom dataclasses import dataclasstype IVFIndex = dict[int, list[int]]MAX_STEPS = 100@dataclassclass Point2D:    x: float    y: floatdef distance(point_1: Point2D, point_2: Point2D) -> float:    return math.sqrt((point_1.x - point_2.x) ** 2 + (point_1.y - point_2.y) ** 2)def assign_points(points: list[Point2D], centroids: dict[int, Point2D]) -> list[int]:    clusters = [0 for _ in range(len(points))]    for p_index in range(len(points)):        min_dist    = None        min_cluster = None        for cluster, centroid in centroids.items():            dist = distance(points[p_index], centroid)            if min_dist is None or dist < min_dist:                min_dist    = dist                min_cluster = cluster        clusters[p_index] = min_cluster    return clustersdef update_centroids(points: list[Point2D], clusters: list[int], centroids: dict[int, Point2D]) -> dict[int, Point2D]:    updated_centroids = {}    for cluster, _ in centroids.items():        mean_x = 0        mean_y = 0        mean_n = 0        for p_index in range(len(points)):            if cluster == clusters[p_index]:                mean_x += points[p_index].x                mean_y += points[p_index].y                mean_n += 1        if mean_n > 0:            updated_centroids[cluster] = Point2D(mean_x / mean_n, mean_y / mean_n)        else:            updated_centroids[cluster] = centroids[cluster]    return updated_centroidsdef k_means(k: int, points: list[Point2D]) -> tuple[dict[int, Point2D], list[int]]:    clusters = [0 for _ in range(len(points))]    # initialize centroids    centroids = {cluster: points[p_index] for cluster, p_index in enumerate(random.sample(range(len(points)), k), 1)}    step = 0    while step < MAX_STEPS:        updated_clusters = assign_points(points, centroids)        if clusters == updated_clusters:            return centroids, updated_clusters        clusters  = updated_clusters        centroids = update_centroids(points, clusters, centroids)        step += 1    return centroids, clustersdef build_ivf(nlist: int, points: list[Point2D]) -> tuple[dict[int, Point2D], IVFIndex]:    centroids, clusters = k_means(nlist, points)    inv_file = {cluster: [] for cluster in centroids}    for p_index, cluster in enumerate(clusters):        inv_file[cluster].append(p_index)    return centroids, inv_filedef search_ivf(    nprobe:    int,    top_k:     int,    query:     Point2D,    points:    list[Point2D],    centroids: dict[int, Point2D],    inv_file:  IVFIndex,) -> list[Point2D]:    closest_clusters = sorted(centroids, key=lambda cluster: distance(centroids[cluster], query))    closest_clusters = closest_clusters[:nprobe]    closest_points = [p_index for cluster in closest_clusters for p_index in inv_file[cluster]]    closest_points = sorted(closest_points, key=lambda p_index: distance(points[p_index], query))    closest_points = closest_points[:top_k]    return [points[p_index] for p_index in closest_points]points = [Point2D(random.random(), random.random()) for _ in range(100)]top_k = 3query = Point2D(random.random(), random.random())centroids, inv_file = build_ivf(nlist=10, points=points)closest_points = search_ivf(nprobe=5, top_k=top_k, query=query, points=points, centroids=centroids, inv_file=inv_file)print('Query:', query)print('Closest points:', closest_points)

Оптимизация

Хорошо, мы смогли отбросить не очень перспективные кластера и значительно сузить круг поиска. Но! В оставшихся nprobe кластерах нам придется применять все тот же brute-force поиск. Такой способ обычно, например, в том же FAISS обозначают как IVF + Flat.

Этап начального отсеивания кандидатов называется coarse quantization. После, вместо тупого brute-force нам хотелось бы иметь более оптимальный метод. И такие методы, к счастью, существуют и далее мы рассмотрим один из них, являющийся в каком-то смысле базовым вариантом, он называется Product Quantization и обозначается как IVF + PQ.

Product Quantization (PQ)

PQ — метод позволяющий сжимать вектора, позволяя таким образом сильно экономить память и ускорять вычисление расстояний используя хитрые оптимизации.

Основная идея

Мы берем наши вектора и делим их на подпространства:

x = (x^{(1)}, x^{(2)}, ..., x^{(m)})

где

x \in \mathbb{R}^nx^{(i)} \in \mathbb{R}^{n/m}

То есть, грубо говоря, допустим у нас есть вектор размерностью 128, мы можем разделить его на 8 равных частей, каждый из кусков теперь будет представлять вектор размерностью 16.

Далее, в каждом из подпространств мы запускаем хорошо уже знакомый нам алгоритм k-means. В итоге, мы получаем набор центроид, который мы в дальнейшем будем называть codebook’ом:

codebook_i = (c_1, c_2, ..., c_k)

где

i - \text{индекс подпространства} \\c_j - \text{центроида с индексом } j

Теперь, вместо нашего исходного вектора мы будем хранить индексы ближайших центроид из codebook’а соответствующего подпространства:

(x^{(1)}, x^{(2)}, ..., x^{(m)}) \to (j_x^{(1)}, j_x^{(2)}, ..., j_x^{(m)})

где

j_x^{(i)} - \text{индекс центроиды в подпространстве } i \text{ вектора } x

Вернемся к примеру нашего вектора размерностью 128, который мы разделили на 8 равных частей. Теперь, каждый вектор подпространства размерностью 16 будет заменен на индекс центроиды:

\underbrace{[0.52, 1.45, ..., 0.32]}_{128} \to \underbrace{[5, 12, ..., 25]}_{8}

Вычисление расстояния

Теперь представим, что к нам прилетел вектор q и нам нужно уметь вычислять расстояние от вектора q до сжатого нашим PQ вектора x.

q \in \mathbb{R}^n

Первое что приходит на ум, это сжать вектор q тем же самым PQ и вычислить расстояние между двумя квантованными векторами. Как это сделать? Очень просто! Мы считаем расстояния между соответствующими центроидами и просто их суммируем.

q \to (c_q^{(1)}, c_q^{(2)}, ..., c_q^{(m)})x \to (c_x^{(1)}, c_x^{(2)}, ..., c_x^{(m)})

где

c_q^{(i)} - \text{центроида в подпространстве } i \text{ вектора } qc_x^{(i)} - \text{центроида в подпространстве } i \text{ вектора } x

Искомое расстояние:

dist(q, x) = \sum_{\substack{i}}\left\|c_q^{(i)} - c_x^{(i)}\right\|^2

Причем, раз у нас центроиды фиксированы, мы заранее можем посчитать все расстояния и свести вычисление расстояния к поиску по таблице.

T^{(i)}[j][k] = \left\|c_j^{(i)} - c_k^{(i)}\right\|^2

Звучит так, будто это очень быстро! Так и есть, но тут есть один существенный минус, мы теряем в точности из-за двойного квантования.

Вышеописанный метод, кстати говоря, называется Symmetric Distance Computation (SDC).

Окей, мы не хотим терять в точности и быть такими же быстрыми и ловкими. Можем? Можем!

Разобъем вектор q на подпространства, но не будем заменять их на индексы из codebook’а. Вместо этого мы возьмем кусок вектора q без изменений и вычислим расстояние от него до центроиды вектора x из нашей базы. И сумма всех этих расстояний и будет нашим искомым расстоянием. Это будет точнее, за счет того, что мы не “портим” вектор q квантованием.

dist(q, x) = \sum_{\substack{i}}\left\|q^{(i)} - c_x^{(i)}\right\|^2

Для каждого запроса мы можем посчитать расстояния до всех центроид и свести вычисление расстояния к поиску по таблице.

T^{(i)}[j] = \left\|q^{(i)} - c_j^{(i)}\right\|^2

Этот метод называется Asymmetric Distance Computation (ADC). В современных системах (в том же FAISS) чаще используется именно ADC за счет лучшего баланса точности и скорости.

Hierarchical Navigable Small World (HNSW)

Введение

HNSW (Hierarchical Navigable Small World) — данный алгоритм приближенного поиска ближайших соседей на сегодняшний день стал фактически стандартом для многих векторных баз данных, таких как Qdrant и PGVector. Реализация этого алгоритма также присутствует в популярной библиотеке FAISS. Все это благодаря тому, что он сочетает в себе высокую скорость поиска и одновременно высокий recall.

Суть алгоритма состоит в том, чтобы построить граф, где каждый вектор является вершиной, а ребра соединяют его с несколькими ближайшими к нему векторами. Особенность данного алгоритма состоит в том, что у нас есть набор слоев, где самый верхний слой содержит меньше всего вершин, а самый нижний содержит абсолютно все вершины. Каждый слой, который находится уровнем ниже, содержит все вершины вышележащего, плюс некоторое дополнительное количество вершин.

Такая структура позволяет на начальных этапах делать дальние переходы в нужную область пространства, а затем, опускаясь ниже слой за слоем, выполнять более точный поиск среди близких кандидатов. Это похоже на навигацию по карте: сначала мы ищем нужный город, затем район в этом городе, потом улицу, и уже после этого конкретный дом.

Описание алгоритма

Общая схема работы алгоритма у нас будет состоять из двух этапов:

  1. Построение HNSW индекса.

    1.1. Если наш граф еще пустой, то первый добавленный вектор мы назначим стартовой точкой входа, которую назовем entry point.

    1.2. Для каждого нового вектора случайным образом выбираем максимальный уровень max_level.

    1.3. Допустим, мы добавляем новый вектор vector_new, для которого был выбран максимальный уровень max_level.

    1.4. Начинаем поиск места для вставки с текущей entry point на самом верхнем уровне графа.

    1.5. На уровнях выше max_level мы просто жадно спускаемся ближе к нужной области пространства.

    То есть на каждом уровне мы переходим к соседней вершине, если она находится ближе к vector_new, чем текущая вершина. Повторяем это до тех пор, пока среди соседей текущей вершины нет более близкой вершины. После этого спускаемся на уровень ниже.

    1.6. Когда мы доходим до max_level, начинаем вставку нашего вектора vector_new в наш граф.

    1.7. Для каждого уровня от max_level до самого нижнего уровня выполняем поиск ef_construction ближайших кандидатов до вектора vector_new.

    1.8. Из найденных кандидатов выбираем не больше M соседей, с которыми будет соединен вектор vector_new.

    1.9. После выбора соседей добавляем ребра между vector_new и выбранными вершинами.

    1.10. Если после добавления нового ребра у какой-то существующей вершины количество соседей стало больше M, то список ее соседей нужно сократить.

    1.11. После вставки вектора на текущем уровне алгоритм спускается на уровень ниже и повторяет те же действия: ищет кандидатов с помощью ef_construction, выбирает до M соседей и добавляет ребра.

    1.12. Если max_level оказался выше текущего максимального уровня графа, то vector_new становится новой entry point.

  2. Поиск top_k ближайших векторов до вектора query.

    2.1. Допустим, теперь нам на вход дают вектор query и нам нужно найти top_k ближайших до него векторов.

    2.2. Начинаем поиск с текущей entry point на самом верхнем уровне графа.

    2.3. На всех уровнях кроме самого нижнего выполняем жадный поиск.

    2.4. На каждом уровне мы смотрим соседей текущей вершины и переходим к тому соседу, который находится ближе к вектору query.

    Если такой сосед найден, он становится новой текущей вершиной, и мы снова проверяем уже его соседей. Так продолжается до тех пор, пока среди соседей текущей вершины нет вершины ближе к query.

    После этого алгоритм спускается на уровень ниже.

    2.5. Когда мы доходим наконец до самого нижнего уровня, начинаем более подробный поиск среди кандидатов.

    2.6. На нижнем уровне находим до ef_search ближайших кандидатов до нашего вектора query.

    2.7. Ищем среди найденных ef_search кандидатов top_k ближайшие вектора до вектора query.

    2.8. Возвращаем эти top_k ближайшие вектора в отсортированном по расстоянию до вектора query порядке.

Ключевые параметры

M — максимальное количество ребер, которое может иметь вершина на одном уровне графа.

ef_construction — размер списка кандидатов, который используется при построении индекса.

ef_search — размер списка кандидатов, который используется во время поиска.

Увеличение ef_search повышает recall, но делает поиск медленнее.

Увеличение M обычно повышает качество поиска, но увеличивает размер индекса в памяти.

Реализация алгоритма на Python
import heapqimport mathimport randomfrom dataclasses import dataclass, fieldtype HNSWGraph = dict[int, dict[int, list[int]]]MAX_LEVEL = 16@dataclassclass Point2D:    x: float    y: float@dataclassclass HNSWIndex:    graph:       HNSWGraph = field(default_factory=dict)    levels:      dict[int, int] = field(default_factory=dict)    entry_point: int | None = None    max_level:   int = 0def distance(point_1: Point2D, point_2: Point2D) -> float:    return math.sqrt((point_1.x - point_2.x) ** 2 + (point_1.y - point_2.y) ** 2)def random_level(level_prob: float = 0.5) -> int:    level = 0    while random.random() < level_prob and level < MAX_LEVEL:        level += 1    return leveldef greedy_search(query: Point2D, points: list[Point2D], graph: HNSWGraph, entry_point: int, level: int) -> int:    current      = entry_point    current_dist = distance(points[current], query)    changed = True    while changed:        changed = False        for neighbor in graph.get(level, {}).get(current, []):            neighbor_dist = distance(points[neighbor], query)            if neighbor_dist < current_dist:                current      = neighbor                current_dist = neighbor_dist                changed = True    return currentdef search_layer(    query:       Point2D,    points:      list[Point2D],    graph:       HNSWGraph,    entry_point: int,    level:       int,    ef:          int,) -> list[int]:    entry_dist = distance(points[entry_point], query)    candidates = [( entry_dist, entry_point)]    closest    = [(-entry_dist, entry_point)]    visited = set()    visited.add(entry_point)    while candidates:        current_dist, current =  heapq.heappop(candidates)        worst_dist            = -closest[0][0]        if current_dist > worst_dist and len(closest) >= ef:            break        for neighbor in graph.get(level, {}).get(current, []):            if neighbor in visited:                continue            visited.add(neighbor)            neighbor_dist =  distance(points[neighbor], query)            worst_dist    = -closest[0][0]            if len(closest) < ef or neighbor_dist < worst_dist:                heapq.heappush(candidates, ( neighbor_dist, neighbor))                heapq.heappush(closest,    (-neighbor_dist, neighbor))                if len(closest) > ef:                    heapq.heappop(closest)    return [node for _, node in sorted([(-dist, node) for dist, node in closest])]def select_neighbors(query: Point2D, points: list[Point2D], candidates: list[int], M: int) -> list[int]:    return sorted(candidates, key=lambda p_index: distance(points[p_index], query))[:M]def add_edge(graph: HNSWGraph, level: int, from_node: int, to_node: int) -> None:    graph.setdefault(level, {})    graph[level].setdefault(from_node, [])    if to_node not in graph[level][from_node]:        graph[level][from_node].append(to_node)def shrink_neighbors(points: list[Point2D], graph: HNSWGraph, level: int, node: int, M: int) -> None:    neighbors = graph[level][node]    if len(neighbors) <= M:        return    graph[level][node] = sorted(neighbors, key=lambda neighbor: distance(points[node], points[neighbor]))[:M]def insert_hnsw(index: HNSWIndex, points: list[Point2D], new_point_index: int, ef_construction: int, M: int) -> None:    new_point           = points[new_point_index]    new_point_max_level = random_level()    index.levels[new_point_index] = new_point_max_level    for level in range(new_point_max_level + 1):        index.graph.setdefault(level, {})        index.graph[level].setdefault(new_point_index, [])    if index.entry_point is None:        index.entry_point = new_point_index        index.max_level   = new_point_max_level        return    entry_point = index.entry_point    for level in range(index.max_level, new_point_max_level, -1):        entry_point = greedy_search(            query       = new_point,            points      = points,            graph       = index.graph,            entry_point = entry_point,            level       = level,        )    for level in range(min(new_point_max_level, index.max_level), -1, -1):        candidates = search_layer(            query       = new_point,            points      = points,            graph       = index.graph,            entry_point = entry_point,            level       = level,            ef          = ef_construction,        )        neighbors = select_neighbors(            query      = new_point,            points     = points,            candidates = candidates,            M          = M,        )        for neighbor in neighbors:            add_edge(index.graph, level, new_point_index, neighbor)            add_edge(index.graph, level, neighbor, new_point_index)            shrink_neighbors(points, index.graph, level, neighbor, M)        shrink_neighbors(points, index.graph, level, new_point_index, M)        if candidates:            entry_point = candidates[0]    if new_point_max_level > index.max_level:        index.entry_point = new_point_index        index.max_level   = new_point_max_leveldef build_hnsw(points: list[Point2D], ef_construction: int, M: int) -> HNSWIndex:    index = HNSWIndex()    for p_index in range(len(points)):        insert_hnsw(            index           = index,            points          = points,            new_point_index = p_index,            ef_construction = ef_construction,            M               = M,        )    return indexdef search_hnsw(ef_search: int, top_k: int, query: Point2D, points: list[Point2D], index: HNSWIndex) -> list[Point2D]:    if index.entry_point is None:        return []    entry_point = index.entry_point    for level in range(index.max_level, 0, -1):        entry_point = greedy_search(            query       = query,            points      = points,            graph       = index.graph,            entry_point = entry_point,            level       = level,        )    candidates = search_layer(        query       = query,        points      = points,        graph       = index.graph,        entry_point = entry_point,        level       = 0,        ef          = ef_search,    )    closest_points = sorted(candidates, key=lambda p_index: distance(points[p_index], query))    closest_points = closest_points[:top_k]    return [points[p_index] for p_index in closest_points]points = [Point2D(random.random(), random.random()) for _ in range(100)]top_k = 3query = Point2D(random.random(), random.random())index = build_hnsw(points=points, ef_construction=20, M=5)closest_points = search_hnsw(ef_search=20, top_k=top_k, query=query, points=points, index=index)print('Query:', query)print('Closest points:', closest_points)

Заключение

IVF — очень простой в реализации и понимании, потребляет мало памяти, обеспечивает неплохую скорость поиска и довольно хороший recall, а самый главный его плюс на мой взгляд в том, что он очень хорошо масштабируется на большие коллекции.

HNSW — чуть сложнее в реализации и понимании, потребляет довольно много памяти, но выдает лучший recall при той же скорости поиска, есть мнение, что он плохо масштабируется на большие коллекции.

Мы довольно подробно рассмотрели два самых важных на мой взгляд алгоритма используемых в векторном поиске и надеюсь это поможет глубже понять то, как они устроены и в каких случаях стоит выбирать тот или иной алгоритм.

Спасибо за внимание, кто дочитал до конца, тот большой молодец!

ссылка на оригинал статьи https://habr.com/ru/articles/1046654/