
Привет! Меня зовут Артем Гаврилов и я работаю в Tarantool. Сегодня я расскажу, как быстро создать объектное хранилище на основе платформы in-memory вычислений Tarantool и распределённой файловой системы IPFS (InterPlanetary File System).
Мы рассмотрим пример шардирования стороннего приложения с помощью Tarantool и сделаем MVP объектного хранилища с отказоустойчивостью на уровне ЦОДа, в то время как более простые решения отказоустойчивы только на уровне нескольких серверов.
Тем, кто знаком с IPFS, вероятно, будет интересно читать начиная с раздела «С чем мы столкнёмся».
IPFS — это вообще что?
Если кратко, IPFS (от англ. InterPlanetary File System) — это полностью распределённое файловое хранилище. Оно обладает несколькими характерными свойствами:
- Доступ к данным по их хешу.
- Управление и доступ к файлам через протокол HTTP.
- Все файлы на ноде делятся на закреплённые (pinned) и незакреплённые. Различие между ними в том, что первые никогда не удаляются из ноды. По-настоящему нода хранит именно закреплённые файлы, а все остальные — это просто кеш на диске.
При отсутствии файла на конкретной ноде IPFS скачает его с другой. Закрепление делается вручную через API. - Если запросить у сервера файл, который на нём не хранится, то сервер сначала скачает его как незакреплённый, а потом отдаст по HTTP. А если ему не хватит места, то вытеснит один из незакреплённых файлов.
- Хранилище публично, все ноды связаны и образуют глобальный кластер. Однако данные, которые никто не запрашивает, по логике предыдущего пункта со временем уйдут из всех нод и навсегда потеряются.
Таким образом каждая нода IPFS является сразу шлюзом в HTTP, кешем и холодным хранилищем. Из этого следует, что система сама может переносить данные между нодами. Но IPFS нужна помощь в выборе того, как складывать данные для холодного хранения. Также крайне желательно всегда обращаться к серверу, на котором закреплён нужный файл.
Почему мы выбрали IPFS?
На то есть несколько причин.
- IPFS — законченный продукт, готовый хранить большие объёмы данных.
- IPFS широко применяется в различных сферах. Так, например, он используется криптовалютой Ethereum для реализации NFT токенов: в блокчейне сохраняются только 256 бит хеша файла, а сам файл публикуется в IPFS, не занимая место в блокчейне.
- Некоторые крупные компании используют публичные серверы, для кеширования своих данных (или для обхода блокировок). Например, Cloudflare.
- IPFS также устраняет некоторые проблемы безопасности. В частности, адресация на основе содержимого позволяет хранить и рассчитывать хеши в метаданных без создания отдельного решения.
Подробне про преимущества IPFS можно почитать здесь.
При чём тут Tarantool?
С помощью Tarantool (точнее, Tarantool Cartridge) мы будем шардировать данные и искать IPFS-ноды, на которых эти данные расположены.
Tarantool подходит для этой задачи, так как:
- У платформы есть готовый механизм шардирования (vshard), к которому можно добавить свою логику.
- Tarantool может общаться по HTTP с IPFS.
Кроме того, платформа Tarantool — это база данных со встроенным сервером приложений, что также сильно упростит реализацию.
Почему не IPFS Cluster?
Существует нативное решение IPFS Cluster, однако у него есть недостатки. Он просто считает количество копий закреплённых данных, а не распределяет их по наборам метрик, как типовой модуль шардирования Tarantool vshard. Это влияет на уровень отказоустойчивости: если собирать набор реплик из серверов в разных ЦОДах, то можно добиться отказоустойчивости даже при проблеме с одним из ЦОДов (а такие бывают). Помимо прочего, IPFS-кластер ограничивает нас отсутствием метаданных и поисковыми индексами только по хешу.
С чем мы столкнёмся?
Длительные операции
Работа с большими файлами — всегда долгая операция. При этом триггеры также накладывают ограничение: они должны выполняться в рамках одной транзакции и как следствие не могут делать сетевые вызовы. Поэтому надо будет предусмотреть асинхронное обновление.
Асинхронная обработка предполагает работу с очередями, и для этого мы будем использовать библиотеку Tarantool-queue, которая позволяет заводить локальные (не распределённые) очереди задач.
Большой объём трафика
Надо будет очень аккуратно пользоваться HTTP, не читая лишний раз тело запроса и активно пользуясь редиректами.
Сторонний API, под который надо подстроиться
IPFS — готовый продукт, менять исходный код которого мы не можем. Это означает, что наши возможности ограничиваются возможностями его API.
Базовая концепция решения
Мы будем придерживаться такой концепции:
- Разворачиваем рядом с каждым хранилищем Tarantool ноду IPFS.
- Управляем IPFS-нодой только с локального хранилища.
- При этом на IPFS-ноде лежат те же объекты, что и в локальном хранилище.
- Задачи, связанные с IPFS, выполняем в очередях.
- Когда мы хотим закрепить или открепить новый файл, мы запишем данные о нём в хранилище и поставим в соответствующую очередь задачу.
В конечном итоге получится такая топология:

Минимальный вариант: поиск по хешам
Схема данных и очереди задач
Делаем две записи о хеше:
- Шардированную — которая находится на тех серверах, на чьих локальных нодах IPFS мы хотим закрепить файлы (желаемое расположение данных).
- Нешардированную — которая соответствует последнему состоянию (закреплённости) в локальном хранилище (фактическое расположение данных).
Также сделаем две очереди: на закрепление (добавление) и открепление (удаление) данных:
local queue = require('queue') local function init_schema(is_master) local pinned = box.schema.space.create('pinned',{format={ {'ipfs_id', 'string'}, {'bucket_id', 'unsigned'} }, if_not_exists=true}) pinned:create_index('primary', {parts={{1, 'string'}}, unique=true, if_not_exists=true}) pinned:create_index('bucket_id', {parts={{2, 'unsigned'}}, unique=false, if_not_exists=true}) if is_master then queue.create_tube('ipfs_add', 'fifottl', {if_not_exists = true}) queue.create_tube('ipfs_remove', 'fifottl', {if_not_exists = true}) end local pinned_no_shard = box.schema.space.create('pinned_no_shard',{format={ {'ipfs_id', 'string'}, {'status', 'string'} }, if_not_exists=true}) pinned_no_shard:create_index('primary', {parts={{1, 'string'}}, unique=true, if_not_exists=true}) end return { init_schema=init_schema }
Задачи на взаимодействие с IPFS будем ставить при записи в шардированную очередь, а при их выполнении будем обновлять данные в нешардированной.
Немного технических полезностей
Небольшой модуль для взаимодействия с IPFS
Нам понадобятся:
- чтение конфигурации IPFS (чтобы узнать, куда кидать запрос);
- get_gateway_address — функция для получения адреса шлюза; не мудрим, просто добавим в конфигурацию IPFS-ноды свой параметр gateway_pub_address;
- управление закреплением файлов(добавление и удаление).
local http = require('http.client').new() local json = require('json') local api_prefix = 'http://127.0.0.1:5001' local function get_config() local resp = http:post(api_prefix..'/api/v0/config/show') if resp.status == 200 then return json.decode(resp.body) else error(resp.reason) end end local function get_address() local config, err = get_config() if err ~= nil then return nil, err end return assert(config.gateway_pub_address, 'no gateway_pub_address in ipfs config') end local function pin_add(ipfs_id) local resp = http:post(api_prefix..'/api/v0/pin/add?arg='..ipfs_id) assert(resp.status == 200, resp.reason) end local function pin_rm(ipfs_id) local resp = http:post(api_prefix..'/api/v0/pin/rm?arg='..ipfs_id) assert(resp.status == 200, resp.reason) end return { get_config = get_config, get_address = get_address, pin_add = pin_add, pin_rm = pin_rm }
Выполнение операции на всём наборе реплик одновременно
Наша очередь, а также спейс с нешардированными данными будут синхронно реплицироваться на весь набор реплик. При этом для полного повторения топологии нам надо положить рядом с каждым экземпляром Tarantool экземпляр IPFS. Поэтому при выполнении задачи из очереди надо вызывать функцию на всём наборе реплик. Для этого напишем небольшую функцию:
local function get_curr_replicaset() local me = cartridge_lua_api_topology.get_self() return cartridge.admin_get_servers(me.uuid)[1].replicaset end local function call_on_whole_replicaset(func, params, options) local servers_uri = {} for _, server in pairs(get_curr_replicaset().servers) do table.insert(servers_uri, server.uri) end local map_results, map_errors = cartridge_pool.map_call(func, params, {uri_list = servers_uri, timeout = options.timeout}) assert(map_errors == nil, map_errors) return map_results end
Как сделать отложенную синхронизацию с IPFS
Покажем в картинках, как будет происходить работа с асинхронным данными. Обратите внимание, что помимо простых операций записи и удаления у нас есть операция решардинга.Чтобы её правильно выполнять, будем при обычном удалении делать пометки в нешардируемом спейсе. Если пометка есть, то можно удалять сразу, иначе надо дождаться закрепления пина на новом хранилище.
Добавление

Удаление

Решардинг

Реализация
Нам понадобится обрабатывать следующие запросы к роутеру:
- HTTP-запрос, перенаправления на объект в IPFS:
GET /tnt_ipfs/pin/:ipfs_id - HTTP-запрос на добавление пина:
PUT /tnt_ipfs/pin/:ipfs_id - HTTP-запрос на удаление пина:
DELETE /tnt_ipfs/pin/:ipfs_id - HTTP-запрос на получение статуса пина:
GET /tnt_ipfs/pin/:ipfs_id/status
А также следующие запросы к хранилищу:
- Запрос, который отдаст нам адрес соответствующего ему шлюза:
ipfs.get_local_ipfs_addr - Запрос на добавление пина:
ipfs.pin_write - Запрос на удаление пина:
ipfs.pin_delete - Запрос на получение статуса пина:
ipfs.pin_status - Запрос между хранилищами на вызов:
ipfs.pin_add—ipfs.internal.pin_add - Запрос между хранилищами на вызов:
ipfs.pin_rm—ipfs.internal.pin_rm
Код для роли роутера
local function init(opts) -- luacheck: no unused args local httpd = assert(cartridge.service_get('httpd'), "Failed to get httpd service") httpd:route({method = 'GET', path = '/tnt_ipfs/pin/:ipfs_id', public = true}, function(req) local ipfs_id = req:stash('ipfs_id') local bucket_id = vshard.router.bucket_id_mpcrc32(ipfs_id) local gateway_addr, err = vshard.router.callbro(bucket_id, 'ipfs.get_local_ipfs_addr') if err ~= nil then return { status = 500, body = err } end return req:redirect_to(gateway_addr..'/ipfs/'..ipfs_id) end) httpd:route({method = 'PUT', path = '/tnt_ipfs/pin/:ipfs_id', public = true}, function(req) local ipfs_id = req:stash('ipfs_id') local bucket_id = vshard.router.bucket_id_mpcrc32(ipfs_id) local _, err = vshard.router.callrw(bucket_id, 'ipfs.pin_write', {{ipfs_id, bucket_id}}) if err ~= nil then return { status = 500, body = err } end return {status = 200} end) httpd:route({method = 'DELETE', path = '/tnt_ipfs/pin/:ipfs_id', public = true}, function(req) local ipfs_id = req:stash('ipfs_id') local bucket_id = vshard.router.bucket_id_mpcrc32(ipfs_id) local _, err = vshard.router.callrw(bucket_id, 'ipfs.pin_delete', {ipfs_id}) if err ~= nil then return { status = 500, body = err } end return {status = 200} end) httpd:route({method = 'GET', path = '/tnt_ipfs/pin/:ipfs_id/status', public = true}, function(req) local ipfs_id = req:stash('ipfs_id') local bucket_id = vshard.router.bucket_id_mpcrc32(ipfs_id) local pin_status, err = vshard.router.callro(bucket_id, 'ipfs.pin_status', {ipfs_id}) if err ~= nil then return { status = 500, body = err } end return { status = 200, body = pin_status } end) return true end
Код для роли хранилища
Тут всё немного сложнее:
- Напишем в отдельном файле реализацию всех API-запросов:
local ipfs = require('app.ipfs') local function get_local_ipfs_addr() return ipfs.get_address() end local function pin_write(tuple) box.space.pinned:put(tuple) end local function pin_delete(ipfs_id) box.space.pinned_no_shard:put({ipfs_id, 'CAN_UNPIN'}) box.space.pinned:delete(ipfs_id) end local function pin_status(ipfs_id) if box.space.pinned:get(ipfs_id) ~= nil then -- Want to pin if box.space.pinned_no_shard:get(ipfs_id) ~= nil then return 'Pinned' else return 'Wait pining' end else -- Want to unpin local pns = box.space.pinned_no_shard:get(ipfs_id) if pns ~= nil then if pns.status == 'CAN_UNPIN' then return 'Wait unpining' else return 'Wait reshard' end else return 'Not found' end end end return { ipfs = { get_local_ipfs_addr = get_local_ipfs_addr, pin_write = pin_write, pin_delete = pin_delete, pin_status = pin_status, internal = { pin_add = ipfs.pin_add, pin_rm = ipfs.pin_rm } } }
- Нам понадобится триггер для постановки задач в очередь:
local function on_replace(old, new, s, op) if old == nil then if new == nil then -- ничего не делаем else queue.tube.ipfs_add:put(new.ipfs_id, {delay=1}) end else if new == nil then queue.tube.ipfs_remove:put(old.ipfs_id, {delay=1}) else if old.ipfs_id ~= new.ipfs_id then queue.tube.ipfs_add:put(new.ipfs_id, {delay=1}) queue.tube.ipfs_remove:put(old.ipfs_id, {delay=1}) end end end end
- Ещё нам понадобятся воркеры для обработки задач:
local ran = false local function add_queue_worker() while ran do local task = queue.tube.ipfs_add:take() -- проверяем, актуально ли задание local p = box.space.pinned:get(task[3]) local pns = box.space.pinned_no_shard:get(task[3]) if p == nil or pns ~= nil then queue.tube.ipfs_add:ack(task[1]) else -- пытаемся закрепить local ok, err = pcall(function() call_on_whole_replicaset('ipfs.internal.pin_add', {task[3]}) end) if ok then -- удалось box.space.pinned_no_shard:put({task[3], 'PINNED'}) queue.tube.ipfs_add:ack(task[1]) else -- не удалось log.error(err) queue.tube.ipfs_add:release(task[1], {delay=10}) end end end end local function rm_queue_worker() while ran do local task = queue.tube.ipfs_remove:take() -- проверяем, актуально ли задание local p = box.space.pinned:get(task[3]) local pns = box.space.pinned_no_shard:get(task[3]) if p ~= nil or pns == nil then queue.tube.ipfs_add:ack(task[1]) else local ok, err -- проверяем, появился ли объект на новом месте ok, err = pcall(function() local pns = box.space.pinned_no_shard:get(task[3]) if pns and pns.status == 'PINNED' then -- проверяем, появился ли пин на новом месте local bucket_id = vshard.router.bucket_id_mpcrc32(task[3]) assert(vshard.router.callro(bucket_id, 'ipfs.pin_status', {task[3]}) == 'Pinned') end end) if ok == nil then --рано откреплять queue.tube.ipfs_remove:release(task[1], {delay=10}) else -- пытаемся открепить ok, err = pcall(function() call_on_whole_replicaset('ipfs.internal.pin_rm', {task[3]}) end) if ok ~= nil then -- удалось box.space.pinned_no_shard:delete(task[3]) queue.tube.ipfs_remove:ack(task[1]) else -- не удалось log.error(err) queue.tube.ipfs_remove:release(task[1], {delay=10}) end end end end end
- Добавим функции для включения и выключения триггера и воркеров:
local function activate() if not ran then -- запускаем воркеры ran = true fiber.create(add_queue_worker) fiber.create(rm_queue_worker) -- включаем триггеры if #box.space.pinned:on_replace() == 0 then box.space.pinned:on_replace(on_replace) end end end local function deactivate() if ran then -- останавливаем воркеры ran = false -- выключаем триггеры if #box.space.pinned:on_replace() == 1 then box.space.pinned:on_replace(nil, on_replace) end end end
- Ну и наконец запихнём в роль включение триггера и воркеров только для мастера:
local function init(opts) -- luacheck: no unused args init_schema.init_schema(opts.is_master) rawset(_G, 'ipfs', storage_api) if opts.is_master then storage_master_service.activate() else storage_master_service.deactivate() end return true end local function stop() -- luacheck: no unused args rawset(_G, 'ipfs') storage_master_service.deactivate() return true end local function apply_config(conf, opts) -- luacheck: no unused args init_schema.init_schema(opts.is_master) if opts.is_master then storage_master_service.activate() else storage_master_service.deactivate() end return true end
Запускаем и тестируем
Запускать будем в Docker. При этом в один контейнер поместим сразу и Tarantool, и IPFS (по одному экземпляру). Для этого воспользуемся cartridge pack docker и следующим Docker-файлом:
FROM centos:7 RUN yum -y update; yum -y install wget; \ wget https://dist.ipfs.io/go-ipfs/v0.12.2/go-ipfs_v0.12.2_linux-amd64.tar.gz; \ tar -xvzf go-ipfs_v0.12.2_linux-amd64.tar.gz; \ cd go-ipfs; \ ./install.sh
Теперь с помощью docker-compose соберём кластер (полный Docker-файл можно посмотреть в репозитории):
version: '2.1' services: Tarantool-ipfs1: image: tnt_ipfs:0.1.0.0 container_name: Tarantool-ipfs1 environment: - IPFS_PATH=/data-ipfs - CARTRIDGE_DATA_DIR=/data - TARANTOOL_INSTANCE_NAME=r1-router - TARANTOOL_ADVERTISE_URI=172.19.1.1:3301 - TARANTOOL_CLUSTER_COOKIE=secret - TARANTOOL_HTTP_PORT=8001 ports: - 4001:4001 # IPFS p2p - 5001:5001 # IPFS API - 8081:8081 # IPFS Gateway - 3301:3301 # Tarantool Iproto - 8001:8001 # Tarantool HTTP volumes: - ../data/Tarantool-ipfs1:/data - ../data/Tarantool-ipfs1-ipfs:/data-ipfs networks: # Нужно. чтобы Tarantool не ругался при запуске на смену IP main: ipv4_address: 172.19.1.1 ... networks: main: ipam: config: - subnet: 172.19.1.0/16
Чтобы IPFS запускалась вместе с Tarantool, добавим в начало init.lua строку require('os').execute('ipfs daemon --init --writable --enable-gc &')
Остаётся сконфигурировать IPFS. Сделать это можно только из Docker, так как по умолчанию API принимает запросы только от 127.0.0.1. Также надо указать IP, на котором мы открывали порт для входящих соединений.
#!/usr/bin/env bash my_ip="X.X.X.X" for i in 1 2 3 4 5; do docker-compose exec "Tarantool-ipfs$i" curl --request POST -L --url "http://127.0.0.1:5001/api/v0/config?arg=gateway_pub_address&arg=http://192.168.1.69:808$i" docker-compose exec "Tarantool-ipfs$i" curl --request POST -L --url "http://127.0.0.1:5001/api/v0/config?arg=Addresses.API&arg=/ip4/0.0.0.0/tcp/5001" docker-compose exec "Tarantool-ipfs$i" curl --request POST -L --url "http://127.0.0.1:5001/api/v0/config?arg=Addresses.Announce&arg=\[\"/ip4/$my_ip/tcp/400$i\"\]&json=1" docker-compose exec "Tarantool-ipfs$i" curl --request POST -L --url "http://127.0.0.1:5001/api/v0/config?arg=Addresses.Gateway&arg=/ip4/0.0.0.0/tcp/8081" docker-compose exec "Tarantool-ipfs$i" curl --request POST -L --url "http://127.0.0.1:5001/api/v0/config/show" docker-compose restart "Tarantool-ipfs$i" done
Заработало. Теперь можно поиграть, получив, например, вот этот объект: Qme7ss3ARVgxv6rXqVPiikMJ8u2NLgmgszg13pYrDKEoiu. При этом IPFS будет поначалу довольно долго его искать, прежде чем закрепить, но закрепив, будет отдавать мгновенно.
И ещё немного скриншотов. Для REST API я предпочитаю использовать Insomnia. Файлик с коллекцией запросов в конце статьи.
Для теста мы будем использовать вот этот хеш: Qme7ss3ARVgxv6rXqVPiikMJ8u2NLgmgszg13pYrDKEoiu.
- Получим по API IPFS список закреплённых сейчас файлов. Изначально он не пустой, это нормально.

- Посмотрим статус закрепления:
GET /tnt_ipfs/pin/Qme7ss3ARVgxv6rXqVPiikMJ8u2NLgmgszg13pYrDKEoiu/status.

Как и ожидалось, о закреплении пока ничего не известно. - Воспользуемся нашим API и закрепим наш объект
PUT /tnt_ipfs/pin/Qme7ss3ARVgxv6rXqVPiikMJ8u2NLgmgszg13pYrDKEoiu.

-
Посмотрим статус закрепления снова, если делать это очень быстро, то можно поймать момент, когда он будет
Wait pining, но потом станетPinned:


- Снова получим по API IPFS список закреплённых сейчас файлов.

Файл закреплён и теперь точно никуда не уйдёт из хранилища. - Посмотрим на сам файл
GET /tnt_ipfs/pin/Qme7ss3ARVgxv6rXqVPiikMJ8u2NLgmgszg13pYrDKEoiu:

Если раскрыть раздел Timeline, то можно увидеть тот самый редирект.

- Попробуем открепить
DELETE /tnt_ipfs/pin/Qme7ss3ARVgxv6rXqVPiikMJ8u2NLgmgszg13pYrDKEoiu:

-
Посмотрим статус закрепления снова, если делать это очень быстро, то можно поймать момент, когда он будет
Wait pining, но потом станетPinned:


Обратите внимание, что GET /tnt_ipfs/pin/Qme7ss3ARVgxv6rXqVPiikMJ8u2NLgmgszg13pYrDKEoiu вполне успешно выполнится, даже если файл не закреплён. Однако, если вы ранее не запрашивали или не закрепляли файл, запрос будет выполняться дольше. Дело в том что IPFS Gateway будет пытаться найти файл на чужих нодах. А незакрепленный файл со временем может быть вытеснен из нашей ноды. И если при этом никто в интернете его не сохранит, то он и вовсе будет безвозвратно утрачен.
Добавляем индексный слой
Если вы не знакомы с индексными слоями, то общие идеи их построения можно посмотреть здесь: https://habr.com/ru/company/vk/blog/657789/.
Схема данных
Индексный слой будет состоять из следующих записей:
| path | bucket_id | name | is_dir | ipfs_id |
|---|---|---|---|---|
| полный путь до родительского каталога | bucket_id | имя | является ли запись каталогом | хеш в IPFS |
*bucket_id строится по path
Таким образом можно будет рекурсивно искать файлы.
Сразу добавим в код создание спейсов:
local objects = box.schema.space.create('objects',{format={ {'path', 'string'}, {'bucket_id', 'unsigned'}, {'name', 'string'}, {'is_dir', 'boolean'}, {'ipfs_id', 'string', is_nullable = true} }, if_not_exists=true}) objects:create_index('primary', {parts={{1, 'string'},{3, 'string'}}, unique=true, if_not_exists=true}) objects:create_index('bucket_id', {parts={{2, 'unsigned'}}, unique=false, if_not_exists=true})
Реализация
Нам понадобится обрабатывать такие запросы к роутеру:
- HTTP-запрос на добавление директории:
PUT /tnt_ipfs/storage/*full_path?type=dir - HTTP-запрос на добавление объекта:
PUT /tnt_ipfs/storage/*full_path?type=object&ipfs_id=... - HTTP-запрос, перенаправляющий на объект в IPFS или отдающий листинг директории:
GET /tnt_ipfs/storage/*full_path - HTTP-запрос на удаление директории или объекта:
DELETE /tnt_ipfs/storage/*full_path
И запросы к хранилищу:
- Запрос на добавление директорий или объектов:
storage.add - Запрос на удаление директорий или объектов:
storage.del - Запрос на получение информации о директории или объекте:
storage.get - Запрос на листинг директории:
storage.ls
Код для роли роутера
local function convert_path(full_path) local path, name = full_path:match("^(.*)/(.+)$") if not path then return '', full_path end return path, name end ... httpd:route({method = 'PUT', path = '/tnt_ipfs/storage/*full_path', public = true}, function(req) local full_path = req:stash('full_path') local path, name = convert_path(full_path) local bucket_id = vshard.router.bucket_id_mpcrc32(path) local type = req:query_param('type') if type == 'dir' then local _, err = vshard.router.callrw(bucket_id, 'storage.add', {path, bucket_id, name, true}) if err ~= nil then return { status = 500, body = err } end return {status = 200} elseif type == 'object' then local ipfs_id = req:query_param('ipfs_id') local bucket_id2 = vshard.router.bucket_id_mpcrc32(ipfs_id) local _, err = vshard.router.callrw(bucket_id2, 'ipfs.pin_write', {{ipfs_id, bucket_id2}}) if err ~= nil then return { status = 500, body = err } end _, err = vshard.router.callrw(bucket_id, 'storage.add', {path, bucket_id, name, false, ipfs_id}) if err ~= nil then return { status = 500, body = err } end return {status = 200} end return {status = 400} end) httpd:route({method = 'DELETE', path = '/tnt_ipfs/storage/*full_path', public = true}, function(req) local full_path = req:stash('full_path') local path, name = convert_path(full_path) local bucket_id = vshard.router.bucket_id_mpcrc32(path) local deleted, err = vshard.router.callrw(bucket_id, 'storage.del', {path, name}) if deleted == nil or err ~= nil then return { status = 500, body = err } end if deleted.is_dir then return {status = 200} else bucket_id = vshard.router.bucket_id_mpcrc32(deleted.ipfs_id) local _, err2 = vshard.router.callrw(bucket_id, 'ipfs.pin_delete', {deleted.ipfs_id}) if err2 ~= nil then return { status = 500, body = err2 } end return {status = 200} end end) httpd:route({method = 'GET', path = '/tnt_ipfs/storage/*full_path', public = true}, function(req) local full_path = req:stash('full_path') local path, name = convert_path(full_path) local bucket_id = vshard.router.bucket_id_mpcrc32(path) local main_record, err if path == '' then --каталог в корне main_record = {is_dir=true} else main_record, err = vshard.router.callro(bucket_id, 'storage.get', {path,name}) if err ~= nil then return { status = 500, body = err } end if main_record == nil then return {status = 404} end end if main_record.is_dir then bucket_id = vshard.router.bucket_id_mpcrc32(full_path) local result, err2 = vshard.router.callro(bucket_id, 'storage.ls', {full_path}) if err2 ~= nil then return { status = 500, body = err2 } end return { status = 200, body = result } else bucket_id = vshard.router.bucket_id_mpcrc32(main_record.ipfs_id) local gateway_addr, err2 = vshard.router.callbro(bucket_id, 'ipfs.get_local_ipfs_addr') if err2 ~= nil then return { status = 500, body = err2 } end return req:redirect_to(gateway_addr..'/ipfs/'..main_record.ipfs_id) end end)
API хранилища
local function storage_add(path,bucket_id,name,is_dir,ipfs_id) if is_dir then box.space.objects:put({path,bucket_id,name,true}) else box.space.objects:put({path,bucket_id,name,false,ipfs_id}) end end local function storage_del(path,name) local data = box.space.objects:delete({path,name}) assert(data ~= nil) return data:tomap({names_only=true}) end local function storage_get(path,name) local data = box.space.objects:get({path,name}) assert(data ~= nil) return data:tomap({names_only=true}) end local function storage_ls(path) local result = {} for _,i in box.space.objects:pairs({path}) do table.insert(result, {name = i.name, is_dir = i.is_dir}) end return result end
Запускаем и тестируем
- Создадим каталог
PUT /tnt_ipfs/storage/a?type=dir. Его мы увидеть не сможем, потому что он пустой. - Создадим второй каталог внутри первого:
PUT /tnt_ipfs/storage/a/b?type=dir. - Посмотрим содержимое первого каталога:
GET /tnt_ipfs/storage/a.

- Создадим объект внутри первого каталога:
PUT /tnt_ipfs/storage/a/c?type=object&ipfs_id=Qme7ss3ARVgxv6rXqVPiikMJ8u2NLgmgszg13pYrDKEoiu. - Снова посмотрим содержимое первого каталога:
GET /tnt_ipfs/storage/a.

- Посмотрим на сам файл:
GET /tnt_ipfs/storage/a/c:

Если в разделе Timeline всё тот же редирект.

- Попробуем удалять файлы и директории:
DELETE /tnt_ipfs/storage/a/b:

DELETE /tnt_ipfs/storage/a/с:

Смотрим, как всё классно. Обсуждаем, что получилось
Чем хорошо это решение?
Приложив не так много усилий, мы получили работающий MVP объектного хранилища. И
если заменить gateway_pub_address, то можно использовать серверы сторонних провайдеров для кеша ваших данных.
Решение гарантирует отказоустойчивость при проблемах с серверами в n-1 ЦОДах при условии, что все наборы реплик собраны из n хранилищ в разных ЦОДах. Также мы получили из коробки балансировку по всем нодам IPFS.
Как можно применить это на практике?
- Хранить большие объёмы публичных данных.
- Бекапить большие объёмы данных, уже залитых в IPFS. К примеру, NFT-токены записаны в блокчейне Ethereum как хеши, при этом сами файлы находятся в IPFS.
- По описанным в статье принципам можно шардировать и другие приложения.
Чего нехватает, чтобы это решение можно было назвать Prod-Ready?
Есть много причин считать это решение, скорее, MVP, нежели Prod-Ready. Однако его доработка явно выходит за рамки статьи.
-
Инструменты. Настоящее Prod-Ready решение должно содержать инструменты для мониторинга, исправления ошибок и замера использования пространства.
-
Reference counter. По-хорошему, на закреплениях должен быть Reference counter, чтобы их можно было безопасно добавлять в поиск несколько раз (иначе при удалении одного места в поиске произойдёт открепление и данные могут потеряться).
-
Добавление метаданных. В настоящих объектных хранилищах есть метаданные, которые могут использоваться для поиска, управления длительностью хранения файлов и прочего.
-
Разграничение прав. Хоть это и публичное хранилище, управление правами на запись в поисковый слой и ограничение пространства для разных пользователей всё равно является полезной функциональностью.
-
Альтернативные варианты шардирования. Существует более продвинутые варианты шардирования, учитывающие объём данных, нагрузку на диск и сеть. Их также можно применить к конечному приложению, поставив для управления и поиска Tarantool. Однако это не будет простым повторением шардирования, что многократно усложнит задачу.
Репозиторий с полным кодом: https://github.com/Artem3213212/TNT-IPFS
ссылка на оригинал статьи https://habr.com/ru/company/vk/blog/681800/

















Добавить комментарий