Распределённые блокировки с помощью Tarantool 3

от автора

Распределенная блокировка — очень удобный инструмент в кластере, который помогает обеспечивать эксклюзивный доступ к некоторому общему ресурсу. Цель такой блокировки — обеспечить доступ к ресурсу лишь одному сервису или запросу в данный момент времени. Так предотвращается гонка за данными и их неконсистентность. Распределенная (или кластерная) блокировка называется так потому, что она обеспечивается несколькими узлами, и выход из строя одного из них не повлияет на приложение. В этой статье я расскажу, как реализовать этот инструмент с помощью Tarantool 3.

Tarantool 3

Tarantool 3 — это новая версия in-memory базы данных, которая лежит в основе промежуточного ПО для хранения и обработки данных Tarantool. Она полностью совместима с файлами и репликацией второй версии.

Перечислю основные характеристики Tarantool 3 в сравнении с Tarantool 2:

  • Cервер с конфигом.

  • Ориентация на кластерность.

  • Удобная система триггеров.

  • Переопределение сетевого API (IPROTO).

  • Упрощение настройки репликации.

  • Имена узлов вместо UUID.

  • Расширенная статистика по потреблению памяти.

  • Значения полей по умолчанию.

Подробнее можно почитать в другой статье на Хабре

Принцип работы

Блокировки будут храниться в таблице (спейсе) с такой структурой:

name

token

expire

Взятой блокировкой будет считаться наличие строки в таблице c ненулевым expire. Отпущенной блокировкой будет считаться или отсутствие строки в таблице, или нулевой expire.

Для надежного хранения состояния блокировки настроим синхронную репликацию. Это делается при создании таблицы (спейса). Для операций с блокировками будем использовать транзакции в режиме linearizable. Для автоматического переключения лидера настроим raft-фейловер.

Так будет выглядеть топология приложения.

Три узла с одним лидером и golang приложение

Три узла с одним лидером и golang приложение

Напишем хранимую процедуру на Lua для взятия блокировки. Процедура работает следующим образом:

  • Принимает параметры: имя и таймаут.

  • Проверяет, существует ли блокировка. Если нет — создает новую. 

  • Если блокировка существовала, то проверяет, отпущена ли она (expire == 0). 

  • Если отпущена, то наращивает token и устанавливает время expire. 

  • Если уже была взята, процедура возвращает nil.

function _G.acquireLock(name, timeout)     box.begin({ txn_isolation = "linearizable", timeout = timeout })       local lock = box.space.locks:get(name)     if lock == nil then         lock = { name, 0, clock.time64() + timeout * 1e9 }         box.space.locks:insert(lock)         box.commit()         return lock     end      if lock['expire'] == 0 then         box.space.locks:update({ name }, { "=", "token", lock['token'] + 1 },             { "=", "expire", clock.time64() + timeout * 1e9 })         box.commit()         return lock     end      box.commit()     return nil end

Важно учесть, что наш алгоритм запущен в кластере и использует время. Для корректности работы важно держать время синхронизированным. Как это сделать.

Теперь напишем процедуру на  Lua для отпускания блокировки. Процедура принимает параметры: имя и токен — и проверяет, что токен блокировки совпадает и expire не равен нулю. Тогда блокировка отпускается и процедура возвращает true, иначе — false.

function _G.releaseLock(name, token)      box.begin({ txn_isolation = "linearizable" })      local lock = box.space.locks:get(name)     if lock == nil then         box.commit()         return false     end      if lock['token'] == token and lock['expire'] ~= 0 then         box.space.locks:update({ name }, { { "=", "expire", 0 } })         box.commit()         return true     end      box.commit()     return false end

Для создания таблицы (спейса) с блокировками используется фоновая процедура. Она:

  1. Ждет, что узел станет лидером. 

  2. Создает таблицу с необходимыми полями.

  3. Запускает цикл для обработки тайм-аутов блокировок.

fiber.create(function()      fiber.name("expire-lock-fiber")      box.ctl.wait_rw()      box.schema.space.create("locks", {         is_sync=true,         if_not_exists=true})      box.space.locks:format({{name="name", type="string"},         {name='token', type='unsigned'},         {name='expire', type='unsigned'}})      box.space.locks:create_index('name', {         parts={{field="name", type="string"}},         if_not_exists=true})      box.space.locks:create_index('expire', {         parts={{field="expire", type="unsigned"}},         unique=false,         if_not_exists=true})      while true do          box.ctl.wait_rw()          local now = clock.time64()         for _, t in box.space.locks.index.expire:pairs({0}, {iterator="GT"}) do             if t[3] < now then                 local rc, err = pcall(box.space.locks.update, box.space.locks, {t["name"]}, {{"=", "expire", 0}})                 if not rc then                     log.info(err)                     break                 end             end         end          fiber.sleep(1)     end end)

Инициализация приложения

Сначала инициализируем рабочее окружение для разработки:

tt init

Создадим директорию будущего приложения. Приложения располагаются в instances.enabled:

mkdir instances.enabled/app

Создадим файл конфигурации будущего локального кластера. Этот файл будет содержать топологию и настройки узлов кластера:

touch instances.enabled/app/config.yml

В конфигурации укажем:

  • Что в кластере должно быть три узла.

  • Параметры фейловера для работы кластера.

  • Включение mvcc-режима работы.

  • Файл с хранимыми процедурами.

# Настраиваем пользователей и их секреты credentials:   users:     client:       password: "secret"     replicator:       password: "topsecret"       roles: [replication]  # Указываем под каким пользователем узлы Tarantool # будут подключаться друг к другу iproto:   advertise:     peer:       login: replicator  # Настраиваем raft failover для автоматического и # и ручного консистентного переключения лидера replication:   failover: election  # Использует мультиверсионный движок базы данных # для решения проблем грязных чтений database:   use_mvcc_engine: true  # Указываем исходный файл для создания API блокировок app:   file: init.lua  # Топология кластера # 3 узла объединённых репликацией groups:   group-001:     replicasets:       replicaset-001:         replication:           bootstrap_strategy: config         bootstrap_leader: instance-001         instances:           instance-001:             iproto:               listen:                 - uri: 127.0.0.1:3301           instance-002:             iproto:               listen:                 - uri: 127.0.0.1:3302           instance-003:             iproto:               listen:                 - uri: 127.0.0.1:3303 

Создадим файл, управляющий запуском локальных узлов кластера:

touch instances.enabled/app/instances.yaml

Укажем три узла для запуска:

instance-001: instance-002: instance-003:

Приложение на Lua

Создадим файл init.lua:

touch instances.enabled/app/init.lua

Полный листинг приложения:

local log = require('log') local fiber = require('fiber') local clock = require('clock')  log.info("starting application")  function _G.acquireLock(name, timeout)     box.begin({ txn_isolation = "linearizable", timeout = timeout })     local lock = box.space.locks:get(name)     if lock == nil then         lock = { name, 0, clock.time64() + timeout * 1e9 }         box.space.locks:insert(lock)         box.commit()         return lock     end      if lock['expire'] == 0 then         box.space.locks:update({ name }, { "=", "token", lock['token'] + 1 },             { "=", "expire", clock.time64() + timeout * 1e9 })         box.commit()         return lock     end      box.commit()     return nil end  function _G.releaseLock(name, token)     box.begin({ txn_isolation = "linearizable" })     local lock = box.space.locks:get(name)     if lock == nil then         box.commit()         return false     end      if lock['token'] == token and lock['expire'] ~= 0 then         box.space.locks:update({ name }, { { "=", "expire", 0 } })         box.commit()         return true     end      box.commit()     return false end  fiber.create(function()     fiber.name("expire-lock-fiber")     box.ctl.wait_rw()      box.schema.space.create("locks", {         is_sync = true,         if_not_exists = true     })     box.space.locks:format({ { name = "name", type = "string" },         { name = 'token',  type = 'unsigned' },         { name = 'expire', type = 'unsigned' } })      box.space.locks:create_index('name', {         parts = { { field = "name", type = "string" } },         if_not_exists = true     })     box.space.locks:create_index('expire', {         parts = { { field = "expire", type = "unsigned" } },         unique = false,         if_not_exists = true     })      while true do         box.ctl.wait_rw()         local now = clock.time64()         for _, t in box.space.locks.index.expire:pairs({ 0 }, { iterator = "GT" }) do             if t[3] < now then                 local rc, err = pcall(box.space.locks.update, box.space.locks, { t["name"] }, { { "=", "expire", 0 } })                 if not rc then                     log.info(err)                     break                 end             else                 break             end         end         fiber.sleep(1)     end end) 

Запуск локального кластера

Для запуска кластера из узлов Tarantool 3 воспользуемся командой tt:

tt start

Для проверки статуса узлов выполним команду:

tt status

Для проверки того, что кластер собрался, необходимо подключиться по очереди на узлы и проверить состояние репликации:

tt connect app:instance-001  > box.info.replication
Скрытый текст
--- - 1:     id: 1     uuid: 09652d7e-6b1d-4304-aad0-6ae058c847c8     lsn: 16     upstream:       status: follow       idle: 0.92459100019187       peer: 127.0.0.1:3301       lag: 7.9154968261719e-05     name: instance-001     downstream:       status: follow       idle: 0.91285500023514       vclock: {1: 16, 2: 5186, 3: 5013}       lag: 0   2:     id: 2     uuid: 8e62d6ea-badf-490d-9153-a342d0e48fd8     lsn: 5186     name: instance-002   3:     id: 3     uuid: 7f40d300-e9e6-4916-adaa-b669b672256b     lsn: 5013     upstream:       status: follow       idle: 0.91279700025916       peer: 127.0.0.1:3303       lag: 5.7220458984375e-05     name: instance-003     downstream:       status: follow       idle: 0.91363300010562       vclock: {1: 16, 2: 5186, 3: 5013}       lag: 0 ...

Пример использования кластерных блокировок на Golang

Tarantool общается с приложениями с помощью msgpack-формата, который иногда допускает изменения размера целочисленного типа. Чтобы привести всё к uint64, сделаем утилитарную функцию.

func toUint64(v any) uint64 { switch t := v.(type) { case int, int8, int16, int32, int64: return uint64(reflect.ValueOf(t).Int()) case uint, uint8, uint16, uint32, uint64: return reflect.ValueOf(t).Uint() default: panic("type error") } }

Для общения с кластером Tarantool воспользуемся готовым пулом подключений, который умеет автоматически вычислять, какой узел является лидером:

instances := []pool.Instance{ { Name: "instance-001", Dialer: tarantool.NetDialer{ Address: "127.0.0.1:3301", }, }, { Name: "instance-002", Dialer: tarantool.NetDialer{ Address: "127.0.0.1:3302", }, }, { Name: "instance-003", Dialer: tarantool.NetDialer{ Address: "127.0.0.1:3303", }, }, }  p, err := pool.Connect(context.Background(), instances) if err != nil { panic(err) }

Создадим структуру для хранения блокировки:

type Lock struct { name   string token  uint64 expire uint64 }

Создадим функция для взятия блокировки. Эта функция:

  1. Через пул подключений обращается к кластеру и вызывает хранимую процедуру acquireLock.

  2. Принимает в качестве параметров контекст для выполнения запроса, пул соединений с кластером, имя блокировки и тайм-аут блокировки.

  3. В случае успеха вернет объект блокировки, иначе вернет ошибку.

func acquireLock(ctx context.Context, p pool.Pooler, name string, timeout uint64) (*Lock, error) { resp, err := p.Do(tarantool.NewCallRequest("acquireLock").Context(ctx).Args([]any{name, timeout}), pool.RW).Get() if err != nil { return nil, err }  if len(resp) == 0 { return nil, fmt.Errorf("no response") }  if resp[0] == nil { return nil, fmt.Errorf("failed") }  data := resp[0].([]any)  result := Lock{ name:   name, token:  toUint64(data[1]), expire: toUint64(data[2]), }  return &result, nil }

Создадим функцию для отпускания блокировки. Эта функция:

  1. Принимает контекст, пул соединений к кластеру, объект с блокировкой. 

  2. Через пул подключений обращается к кластеру и вызывает хранимую процедуру acquireLock.

  3. Вернет true, если блокировка успешно отпущена.

  4. Вернет false в случае тех или иных ошибок.

func releaseLock(ctx context.Context, p pool.Pooler, l *Lock) (bool, error) {  resp, err := p.Do(tarantool.NewCallRequest("releaseLock").Context(ctx).Args([]any{l.name, l.token}), pool.RW).Get() if err != nil { return false, err }  if len(resp) == 0 { return false, fmt.Errorf("no response") }  return resp[0].(bool), nil }
Скрытый текст
instances := []pool.Instance{ { Name: "instance-001", Dialer: tarantool.NetDialer{ Address: "127.0.0.1:3301", }, }, { Name: "instance-002", Dialer: tarantool.NetDialer{ Address: "127.0.0.1:3302", }, }, { Name: "instance-003", Dialer: tarantool.NetDialer{ Address: "127.0.0.1:3303", }, }, } p, err := pool.Connect(context.Background(), instances) if err != nil { panic(err) }  ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel()  var l *Lock  for i := 0; i < 3; i++ { l, err = acquireLock(ctx, p, name, 10) if err != nil { time.Sleep(100 * time.Millisecond) continue } break }  if l == nil { fmt.Println(name, "already locked") return }  defer func() { ok, _ := releaseLock(ctx, p, l) if ok { fmt.Println(name, "success unlock") } else { fmt.Println(name, "lock expired") } }()  fmt.Println(name, "success lock")

Весь код main.go

package main  import ( "context" "fmt" "reflect" "sync" "time" "github.com/tarantool/go-tarantool/v2" _ "github.com/tarantool/go-tarantool/v2/datetime" _ "github.com/tarantool/go-tarantool/v2/decimal" "github.com/tarantool/go-tarantool/v2/pool" _ "github.com/tarantool/go-tarantool/v2/uuid" "github.com/tjarratt/babble" )  func toUint64(v any) uint64 { switch t := v.(type) { case int, int8, int16, int32, int64: return uint64(reflect.ValueOf(t).Int()) // a has type int64 case uint, uint8, uint16, uint32, uint64: return reflect.ValueOf(t).Uint() // a has type uint64 default: panic("type error") } }  type Lock struct { name   string token  uint64 expire uint64 }  func acquireLock(ctx context.Context, p pool.Pooler, name string, timeout uint64) (*Lock, error) { resp, err := p.Do(tarantool.NewCallRequest("acquireLock").Context(ctx).Args([]any{name, timeout}), pool.RW).Get() if err != nil { return nil, err } if len(resp) == 0 { return nil, fmt.Errorf("no response") } if resp[0] == nil { return nil, fmt.Errorf("failed") }  data := resp[0].([]any)  result := Lock{ name:   name, token:  toUint64(data[1]), expire: toUint64(data[2]), }  return &result, nil }  func releaseLock(ctx context.Context, p pool.Pooler, l *Lock) (bool, error) { resp, err := p.Do(tarantool.NewCallRequest("releaseLock").Context(ctx).Args([]any{l.name, l.token}), pool.RW).Get() if err != nil { return false, err }  if len(resp) == 0 { return false, fmt.Errorf("no response") }  return resp[0].(bool), nil }  func main() { instances := []pool.Instance{ { Name: "instance-001", Dialer: tarantool.NetDialer{ Address: "127.0.0.1:3301", }, }, { Name: "instance-002", Dialer: tarantool.NetDialer{ Address: "127.0.0.1:3302", }, }, { Name: "instance-003", Dialer: tarantool.NetDialer{ Address: "127.0.0.1:3303", }, }, }  p, err := pool.Connect(context.Background(), instances) if err != nil { panic(err) }  babbler := babble.NewBabbler() babbler.Count = 1    wg := sync.WaitGroup{}    for i := 0; i < 1000; i++ { name := babbler.Babble()  wg.Add(1) go func(name string) { defer wg.Done()  ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel()  var l *Lock for i := 0; i < 3; i++ { l, err = acquireLock(ctx, p, name, 10) if err != nil { time.Sleep(100 * time.Millisecond) continue } break } if l == nil { fmt.Println(name, "already locked") return } defer func() { ok, _ := releaseLock(ctx, p, l) if ok { fmt.Println(name, "success unlock") } else { fmt.Println(name, "lock expired") } }()  fmt.Println(name, "success lock") time.Sleep(50 * time.Millisecond)  }(name) } wg.Wait() }

Итоги

Примерно за 100 строк мы сделали на Tarantool 3 приложения для управления кластерными блокировками. Такое приложение может состоять из одного или нескольких узлов. Для регулирования количества узлов достаточно только редактирования yaml-файла. Если один из узлов Tarantool упадет, то сработает механизм автоматического выбора лидера, и приложение восстановит свою работоспособность.

Полезные ссылки


ссылка на оригинал статьи https://habr.com/ru/articles/835958/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *