Распределенная блокировка — очень удобный инструмент в кластере, который помогает обеспечивать эксклюзивный доступ к некоторому общему ресурсу. Цель такой блокировки — обеспечить доступ к ресурсу лишь одному сервису или запросу в данный момент времени. Так предотвращается гонка за данными и их неконсистентность. Распределенная (или кластерная) блокировка называется так потому, что она обеспечивается несколькими узлами, и выход из строя одного из них не повлияет на приложение. В этой статье я расскажу, как реализовать этот инструмент с помощью Tarantool 3.
Tarantool 3
Tarantool 3 — это новая версия in-memory базы данных, которая лежит в основе промежуточного ПО для хранения и обработки данных Tarantool. Она полностью совместима с файлами и репликацией второй версии.
Перечислю основные характеристики Tarantool 3 в сравнении с Tarantool 2:
-
Cервер с конфигом.
-
Ориентация на кластерность.
-
Удобная система триггеров.
-
Переопределение сетевого API (IPROTO).
-
Упрощение настройки репликации.
-
Имена узлов вместо UUID.
-
Расширенная статистика по потреблению памяти.
-
Значения полей по умолчанию.
Подробнее можно почитать в другой статье на Хабре
Принцип работы
Блокировки будут храниться в таблице (спейсе) с такой структурой:
|
name |
token |
expire |
Взятой блокировкой будет считаться наличие строки в таблице c ненулевым expire. Отпущенной блокировкой будет считаться или отсутствие строки в таблице, или нулевой expire.
Для надежного хранения состояния блокировки настроим синхронную репликацию. Это делается при создании таблицы (спейса). Для операций с блокировками будем использовать транзакции в режиме linearizable. Для автоматического переключения лидера настроим raft-фейловер.
Так будет выглядеть топология приложения.
Напишем хранимую процедуру на Lua для взятия блокировки. Процедура работает следующим образом:
-
Принимает параметры: имя и таймаут.
-
Проверяет, существует ли блокировка. Если нет — создает новую.
-
Если блокировка существовала, то проверяет, отпущена ли она (expire == 0).
-
Если отпущена, то наращивает token и устанавливает время expire.
-
Если уже была взята, процедура возвращает nil.
function _G.acquireLock(name, timeout) box.begin({ txn_isolation = "linearizable", timeout = timeout }) local lock = box.space.locks:get(name) if lock == nil then lock = { name, 0, clock.time64() + timeout * 1e9 } box.space.locks:insert(lock) box.commit() return lock end if lock['expire'] == 0 then box.space.locks:update({ name }, { "=", "token", lock['token'] + 1 }, { "=", "expire", clock.time64() + timeout * 1e9 }) box.commit() return lock end box.commit() return nil end
Важно учесть, что наш алгоритм запущен в кластере и использует время. Для корректности работы важно держать время синхронизированным. Как это сделать.
Теперь напишем процедуру на Lua для отпускания блокировки. Процедура принимает параметры: имя и токен — и проверяет, что токен блокировки совпадает и expire не равен нулю. Тогда блокировка отпускается и процедура возвращает true, иначе — false.
function _G.releaseLock(name, token) box.begin({ txn_isolation = "linearizable" }) local lock = box.space.locks:get(name) if lock == nil then box.commit() return false end if lock['token'] == token and lock['expire'] ~= 0 then box.space.locks:update({ name }, { { "=", "expire", 0 } }) box.commit() return true end box.commit() return false end
Для создания таблицы (спейса) с блокировками используется фоновая процедура. Она:
-
Ждет, что узел станет лидером.
-
Создает таблицу с необходимыми полями.
-
Запускает цикл для обработки тайм-аутов блокировок.
fiber.create(function() fiber.name("expire-lock-fiber") box.ctl.wait_rw() box.schema.space.create("locks", { is_sync=true, if_not_exists=true}) box.space.locks:format({{name="name", type="string"}, {name='token', type='unsigned'}, {name='expire', type='unsigned'}}) box.space.locks:create_index('name', { parts={{field="name", type="string"}}, if_not_exists=true}) box.space.locks:create_index('expire', { parts={{field="expire", type="unsigned"}}, unique=false, if_not_exists=true}) while true do box.ctl.wait_rw() local now = clock.time64() for _, t in box.space.locks.index.expire:pairs({0}, {iterator="GT"}) do if t[3] < now then local rc, err = pcall(box.space.locks.update, box.space.locks, {t["name"]}, {{"=", "expire", 0}}) if not rc then log.info(err) break end end end fiber.sleep(1) end end)
Инициализация приложения
Сначала инициализируем рабочее окружение для разработки:
tt init
Создадим директорию будущего приложения. Приложения располагаются в instances.enabled:
mkdir instances.enabled/app
Создадим файл конфигурации будущего локального кластера. Этот файл будет содержать топологию и настройки узлов кластера:
touch instances.enabled/app/config.yml
В конфигурации укажем:
-
Что в кластере должно быть три узла.
-
Параметры фейловера для работы кластера.
-
Включение mvcc-режима работы.
-
Файл с хранимыми процедурами.
# Настраиваем пользователей и их секреты credentials: users: client: password: "secret" replicator: password: "topsecret" roles: [replication] # Указываем под каким пользователем узлы Tarantool # будут подключаться друг к другу iproto: advertise: peer: login: replicator # Настраиваем raft failover для автоматического и # и ручного консистентного переключения лидера replication: failover: election # Использует мультиверсионный движок базы данных # для решения проблем грязных чтений database: use_mvcc_engine: true # Указываем исходный файл для создания API блокировок app: file: init.lua # Топология кластера # 3 узла объединённых репликацией groups: group-001: replicasets: replicaset-001: replication: bootstrap_strategy: config bootstrap_leader: instance-001 instances: instance-001: iproto: listen: - uri: 127.0.0.1:3301 instance-002: iproto: listen: - uri: 127.0.0.1:3302 instance-003: iproto: listen: - uri: 127.0.0.1:3303
Создадим файл, управляющий запуском локальных узлов кластера:
touch instances.enabled/app/instances.yaml
Укажем три узла для запуска:
instance-001: instance-002: instance-003:
Приложение на Lua
Создадим файл init.lua:
touch instances.enabled/app/init.lua
Полный листинг приложения:
local log = require('log') local fiber = require('fiber') local clock = require('clock') log.info("starting application") function _G.acquireLock(name, timeout) box.begin({ txn_isolation = "linearizable", timeout = timeout }) local lock = box.space.locks:get(name) if lock == nil then lock = { name, 0, clock.time64() + timeout * 1e9 } box.space.locks:insert(lock) box.commit() return lock end if lock['expire'] == 0 then box.space.locks:update({ name }, { "=", "token", lock['token'] + 1 }, { "=", "expire", clock.time64() + timeout * 1e9 }) box.commit() return lock end box.commit() return nil end function _G.releaseLock(name, token) box.begin({ txn_isolation = "linearizable" }) local lock = box.space.locks:get(name) if lock == nil then box.commit() return false end if lock['token'] == token and lock['expire'] ~= 0 then box.space.locks:update({ name }, { { "=", "expire", 0 } }) box.commit() return true end box.commit() return false end fiber.create(function() fiber.name("expire-lock-fiber") box.ctl.wait_rw() box.schema.space.create("locks", { is_sync = true, if_not_exists = true }) box.space.locks:format({ { name = "name", type = "string" }, { name = 'token', type = 'unsigned' }, { name = 'expire', type = 'unsigned' } }) box.space.locks:create_index('name', { parts = { { field = "name", type = "string" } }, if_not_exists = true }) box.space.locks:create_index('expire', { parts = { { field = "expire", type = "unsigned" } }, unique = false, if_not_exists = true }) while true do box.ctl.wait_rw() local now = clock.time64() for _, t in box.space.locks.index.expire:pairs({ 0 }, { iterator = "GT" }) do if t[3] < now then local rc, err = pcall(box.space.locks.update, box.space.locks, { t["name"] }, { { "=", "expire", 0 } }) if not rc then log.info(err) break end else break end end fiber.sleep(1) end end)
Запуск локального кластера
Для запуска кластера из узлов Tarantool 3 воспользуемся командой tt:
tt start
Для проверки статуса узлов выполним команду:
tt status
Для проверки того, что кластер собрался, необходимо подключиться по очереди на узлы и проверить состояние репликации:
tt connect app:instance-001 > box.info.replication
Скрытый текст
--- - 1: id: 1 uuid: 09652d7e-6b1d-4304-aad0-6ae058c847c8 lsn: 16 upstream: status: follow idle: 0.92459100019187 peer: 127.0.0.1:3301 lag: 7.9154968261719e-05 name: instance-001 downstream: status: follow idle: 0.91285500023514 vclock: {1: 16, 2: 5186, 3: 5013} lag: 0 2: id: 2 uuid: 8e62d6ea-badf-490d-9153-a342d0e48fd8 lsn: 5186 name: instance-002 3: id: 3 uuid: 7f40d300-e9e6-4916-adaa-b669b672256b lsn: 5013 upstream: status: follow idle: 0.91279700025916 peer: 127.0.0.1:3303 lag: 5.7220458984375e-05 name: instance-003 downstream: status: follow idle: 0.91363300010562 vclock: {1: 16, 2: 5186, 3: 5013} lag: 0 ...
Пример использования кластерных блокировок на Golang
Tarantool общается с приложениями с помощью msgpack-формата, который иногда допускает изменения размера целочисленного типа. Чтобы привести всё к uint64, сделаем утилитарную функцию.
func toUint64(v any) uint64 { switch t := v.(type) { case int, int8, int16, int32, int64: return uint64(reflect.ValueOf(t).Int()) case uint, uint8, uint16, uint32, uint64: return reflect.ValueOf(t).Uint() default: panic("type error") } }
Для общения с кластером Tarantool воспользуемся готовым пулом подключений, который умеет автоматически вычислять, какой узел является лидером:
instances := []pool.Instance{ { Name: "instance-001", Dialer: tarantool.NetDialer{ Address: "127.0.0.1:3301", }, }, { Name: "instance-002", Dialer: tarantool.NetDialer{ Address: "127.0.0.1:3302", }, }, { Name: "instance-003", Dialer: tarantool.NetDialer{ Address: "127.0.0.1:3303", }, }, } p, err := pool.Connect(context.Background(), instances) if err != nil { panic(err) }
Создадим структуру для хранения блокировки:
type Lock struct { name string token uint64 expire uint64 }
Создадим функция для взятия блокировки. Эта функция:
-
Через пул подключений обращается к кластеру и вызывает хранимую процедуру acquireLock.
-
Принимает в качестве параметров контекст для выполнения запроса, пул соединений с кластером, имя блокировки и тайм-аут блокировки.
-
В случае успеха вернет объект блокировки, иначе вернет ошибку.
func acquireLock(ctx context.Context, p pool.Pooler, name string, timeout uint64) (*Lock, error) { resp, err := p.Do(tarantool.NewCallRequest("acquireLock").Context(ctx).Args([]any{name, timeout}), pool.RW).Get() if err != nil { return nil, err } if len(resp) == 0 { return nil, fmt.Errorf("no response") } if resp[0] == nil { return nil, fmt.Errorf("failed") } data := resp[0].([]any) result := Lock{ name: name, token: toUint64(data[1]), expire: toUint64(data[2]), } return &result, nil }
Создадим функцию для отпускания блокировки. Эта функция:
-
Принимает контекст, пул соединений к кластеру, объект с блокировкой.
-
Через пул подключений обращается к кластеру и вызывает хранимую процедуру acquireLock.
-
Вернет true, если блокировка успешно отпущена.
-
Вернет false в случае тех или иных ошибок.
func releaseLock(ctx context.Context, p pool.Pooler, l *Lock) (bool, error) { resp, err := p.Do(tarantool.NewCallRequest("releaseLock").Context(ctx).Args([]any{l.name, l.token}), pool.RW).Get() if err != nil { return false, err } if len(resp) == 0 { return false, fmt.Errorf("no response") } return resp[0].(bool), nil }
Скрытый текст
instances := []pool.Instance{ { Name: "instance-001", Dialer: tarantool.NetDialer{ Address: "127.0.0.1:3301", }, }, { Name: "instance-002", Dialer: tarantool.NetDialer{ Address: "127.0.0.1:3302", }, }, { Name: "instance-003", Dialer: tarantool.NetDialer{ Address: "127.0.0.1:3303", }, }, } p, err := pool.Connect(context.Background(), instances) if err != nil { panic(err) } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() var l *Lock for i := 0; i < 3; i++ { l, err = acquireLock(ctx, p, name, 10) if err != nil { time.Sleep(100 * time.Millisecond) continue } break } if l == nil { fmt.Println(name, "already locked") return } defer func() { ok, _ := releaseLock(ctx, p, l) if ok { fmt.Println(name, "success unlock") } else { fmt.Println(name, "lock expired") } }() fmt.Println(name, "success lock")
Весь код main.go
package main import ( "context" "fmt" "reflect" "sync" "time" "github.com/tarantool/go-tarantool/v2" _ "github.com/tarantool/go-tarantool/v2/datetime" _ "github.com/tarantool/go-tarantool/v2/decimal" "github.com/tarantool/go-tarantool/v2/pool" _ "github.com/tarantool/go-tarantool/v2/uuid" "github.com/tjarratt/babble" ) func toUint64(v any) uint64 { switch t := v.(type) { case int, int8, int16, int32, int64: return uint64(reflect.ValueOf(t).Int()) // a has type int64 case uint, uint8, uint16, uint32, uint64: return reflect.ValueOf(t).Uint() // a has type uint64 default: panic("type error") } } type Lock struct { name string token uint64 expire uint64 } func acquireLock(ctx context.Context, p pool.Pooler, name string, timeout uint64) (*Lock, error) { resp, err := p.Do(tarantool.NewCallRequest("acquireLock").Context(ctx).Args([]any{name, timeout}), pool.RW).Get() if err != nil { return nil, err } if len(resp) == 0 { return nil, fmt.Errorf("no response") } if resp[0] == nil { return nil, fmt.Errorf("failed") } data := resp[0].([]any) result := Lock{ name: name, token: toUint64(data[1]), expire: toUint64(data[2]), } return &result, nil } func releaseLock(ctx context.Context, p pool.Pooler, l *Lock) (bool, error) { resp, err := p.Do(tarantool.NewCallRequest("releaseLock").Context(ctx).Args([]any{l.name, l.token}), pool.RW).Get() if err != nil { return false, err } if len(resp) == 0 { return false, fmt.Errorf("no response") } return resp[0].(bool), nil } func main() { instances := []pool.Instance{ { Name: "instance-001", Dialer: tarantool.NetDialer{ Address: "127.0.0.1:3301", }, }, { Name: "instance-002", Dialer: tarantool.NetDialer{ Address: "127.0.0.1:3302", }, }, { Name: "instance-003", Dialer: tarantool.NetDialer{ Address: "127.0.0.1:3303", }, }, } p, err := pool.Connect(context.Background(), instances) if err != nil { panic(err) } babbler := babble.NewBabbler() babbler.Count = 1 wg := sync.WaitGroup{} for i := 0; i < 1000; i++ { name := babbler.Babble() wg.Add(1) go func(name string) { defer wg.Done() ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() var l *Lock for i := 0; i < 3; i++ { l, err = acquireLock(ctx, p, name, 10) if err != nil { time.Sleep(100 * time.Millisecond) continue } break } if l == nil { fmt.Println(name, "already locked") return } defer func() { ok, _ := releaseLock(ctx, p, l) if ok { fmt.Println(name, "success unlock") } else { fmt.Println(name, "lock expired") } }() fmt.Println(name, "success lock") time.Sleep(50 * time.Millisecond) }(name) } wg.Wait() }
Итоги
Примерно за 100 строк мы сделали на Tarantool 3 приложения для управления кластерными блокировками. Такое приложение может состоять из одного или нескольких узлов. Для регулирования количества узлов достаточно только редактирования yaml-файла. Если один из узлов Tarantool упадет, то сработает механизм автоматического выбора лидера, и приложение восстановит свою работоспособность.
Полезные ссылки
ссылка на оригинал статьи https://habr.com/ru/articles/835958/
Добавить комментарий