P2P на Go: библиотека libp2p

от автора

Привет, Хабр!

Сегодня рассмотрим библиотеку libp2p в Go. libp2p — это модульная библиотека для построения P2P-сетей. Libp2p выросла из проекта IPFS, но теперь активно используется в блокчейнах, мессенджерах и других децентрализованных приложениях. Главная фича библиотеки — она даёт возможность полностью контролировать P2P-коммуникации.

Libp2p разделена на несколько модулей, которые можно подключать по мере необходимости:

  1. Transport: низкоуровневое соединение (TCP, WebSocket, QUIC, WebRTC).

  2. Muxing: позволяет мультиплексировать несколько потоков данных через одно соединение (например, протокол Yamux).

  3. Security: отвечает за шифрование трафика (Noise или TLS).

  4. Peer Discovery: поиск пиров в сети.

  5. PubSub: широковещательная рассылка данных.

  6. NAT Traversal: обход NAT и создание прямых соединений.

Всё это управляется через набор опций, которые вы передаёте при создании хоста.

Синтаксис: ключевые функции и модули

Создание базового хоста

Хост — это сердце любого P2P-приложения. Он представляет узел в сети, управляет подключениями и адресами.

Функция libp2p.New

func New(opts ...Option) (host.Host, error)

opts — список опций, которые задают поведение хоста. Возвращает объект host.Host, который вы будете использовать для работы с соединениями.

import ( "fmt" "log"  libp2p "github.com/libp2p/go-libp2p" )  func main() { h, err := libp2p.New() if err != nil { log.Fatalf("Ошибка создания хоста: %v", err) } defer h.Close()  fmt.Println("Хост создан. ID:", h.ID()) }

Опции хоста

Libp2p позволяет настраивать хост с помощью опций. Самые важные из них:

libp2p.Identity

Задаёт приватный ключ, который будет использоваться для идентификации узла.

import ( "crypto/rand" "github.com/libp2p/go-libp2p/core/crypto" )  func createIdentity() crypto.PrivKey { priv, _, err := crypto.GenerateEd25519Key(rand.Reader) if err != nil { log.Fatalf("Ошибка генерации ключа: %v", err) } return priv }  h, err := libp2p.New( libp2p.Identity(createIdentity()), )

libp2p.ListenAddrs

Задаёт адреса, на которых хост будет принимать подключения.

import "github.com/multiformats/go-multiaddr"  addr, _ := multiaddr.NewMultiaddr("/ip4/0.0.0.0/tcp/9000") h, err := libp2p.New( libp2p.ListenAddrs(addr), )

libp2p.Security

Добавляет поддержку шифрования. Например, можно использовать протокол Noise:

import "github.com/libp2p/go-libp2p/p2p/security/noise"  h, err := libp2p.New( libp2p.Security(noise.ID, noise.New), )

Работа с пирами

Для подключения к другому узлу используйте host.Connect:

import ( "context" "github.com/libp2p/go-libp2p/core/peer" )  peerAddr, _ := peer.AddrInfoFromString("/ip4/127.0.0.1/tcp/9000/p2p/QmPeerID") err := h.Connect(context.Background(), *peerAddr) if err != nil { log.Fatalf("Ошибка подключения: %v", err) }

Libp2p использует протоколы, чтобы передавать данные между узлами. Вот как открыть поток и отправить сообщение:

import "github.com/libp2p/go-libp2p/core/network"  stream, err := h.NewStream(context.Background(), peerID, "/my-protocol/1.0.0") if err != nil { log.Fatalf("Ошибка создания потока: %v", err) }  _, err = stream.Write([]byte("Привет, P2P!")) if err != nil { log.Fatalf("Ошибка записи: %v", err) }

PubSub

Если нужно реализовать распределённый чат или передачу событий, используйте модуль PubSub. Пример:

import ( "context" "fmt" "log"  libp2p "github.com/libp2p/go-libp2p" pubsub "github.com/libp2p/go-libp2p-pubsub" )  func main() { // Создаём хост h, err := libp2p.New() if err != nil { log.Fatalf("Ошибка создания хоста: %v", err) }  // Создаём PubSub ps, err := pubsub.NewGossipSub(context.Background(), h) if err != nil { log.Fatalf("Ошибка создания PubSub: %v", err) }  // Подключаемся к теме topic, err := ps.Join("chat-room") if err != nil { log.Fatalf("Ошибка подписки: %v", err) }  // Получаем сообщения sub, _ := topic.Subscribe() go func() { for { msg, _ := sub.Next(context.Background()) fmt.Println("Получено сообщение:", string(msg.Data)) } }()  // Отправляем сообщения topic.Publish(context.Background(), []byte("Привет, P2P!")) }

Особенности NAT traversal

Libp2p поддерживает hole punching, чтобы соединять узлы за NAT. Вот как включить его:

import "github.com/libp2p/go-libp2p/p2p/protocol/holepunch"  h, err := libp2p.New( libp2p.EnableHolePunching(), )

Пример P2P-файлообменника с PubSub и NAT Traversal

Создадим прототип P2P-файлообменника, который:

  • Позволяет узлам обнаруживать друг друга.

  • Передаёт метаинформацию о файлах через PubSub.

  • Реализует передачу файлов напрямую через стримы.

  • Работает за NAT благодаря NAT traversal.

Каждый узел будет запускаться с уникальным идентификатором и подключается к общей PubSub-теме file-share. Узлы обмениваются информацией о доступных файлах через PubSub. Когда один из узлов хочет скачать файл, он подключается к владельцу файла и получает данные через поток.

Код:

package main  import ( "bufio" "context" "crypto/rand" "fmt" "io" "log" "os" "strings" "time"  libp2p "github.com/libp2p/go-libp2p" crypto "github.com/libp2p/go-libp2p/core/crypto" pubsub "github.com/libp2p/go-libp2p-pubsub" peer "github.com/libp2p/go-libp2p/core/peer" )  const TopicName = "file-share"  type FileInfo struct { Name string Size int64 Hash string }  func main() { // Создаём P2P-хост host, err := libp2p.New( libp2p.EnableHolePunching(), libp2p.Identity(generateIdentity()), ) if err != nil { log.Fatalf("Ошибка создания хоста: %v", err) } defer host.Close()  fmt.Println("Хост запущен с ID:", host.ID())  // Создаём PubSub ctx := context.Background() ps, err := pubsub.NewGossipSub(ctx, host) if err != nil { log.Fatalf("Ошибка создания PubSub: %v", err) }  // Подключаемся к теме topic, err := ps.Join(TopicName) if err != nil { log.Fatalf("Ошибка подписки на тему: %v", err) }  sub, err := topic.Subscribe() if err != nil { log.Fatalf("Ошибка подписки: %v", err) }  // Запускаем слушатель сообщений go listenForMessages(ctx, sub)  // Основной цикл for { fmt.Println("Введите команду: [share <путь к файлу>] или [download <hash>]") reader := bufio.NewReader(os.Stdin) command, _ := reader.ReadString('\n') command = strings.TrimSpace(command)  if strings.HasPrefix(command, "share") { // Команда для публикации файла filePath := strings.TrimPrefix(command, "share ") shareFile(ctx, topic, filePath) } else if strings.HasPrefix(command, "download") { // Команда для скачивания файла hash := strings.TrimPrefix(command, "download ") downloadFile(ctx, host, hash) } } }  // Генерация приватного ключа для узла func generateIdentity() crypto.PrivKey { priv, _, err := crypto.GenerateEd25519Key(rand.Reader) if err != nil { log.Fatalf("Ошибка генерации ключа: %v", err) } return priv }  // Обработка входящих сообщений func listenForMessages(ctx context.Context, sub *pubsub.Subscription) { for { msg, err := sub.Next(ctx) if err != nil { log.Printf("Ошибка получения сообщения: %v", err) continue }  fmt.Printf("Сообщение от %s: %s\n", msg.ReceivedFrom, string(msg.Data)) } }  // Публикация информации о файле func shareFile(ctx context.Context, topic *pubsub.Topic, filePath string) { fileInfo, err := os.Stat(filePath) if err != nil { log.Printf("Ошибка чтения файла: %v", err) return }  // Генерируем хэш файла hash := fmt.Sprintf("%x", fileInfo.ModTime().UnixNano())  // Публикуем информацию о файле message := fmt.Sprintf("file|%s|%d|%s", fileInfo.Name(), fileInfo.Size(), hash) if err := topic.Publish(ctx, []byte(message)); err != nil { log.Printf("Ошибка публикации сообщения: %v", err) return }  fmt.Printf("Файл %s опубликован! Хэш: %s\n", fileInfo.Name(), hash) }  // Скачивание файла func downloadFile(ctx context.Context, host libp2p.Host, hash string) { fmt.Println("Поиск файла с хэшем:", hash)  // В реальном приложении нужно было бы найти пира с этим файлом через PubSub или DHT // Здесь просто демонстрируем подключение peerAddr, _ := peer.AddrInfoFromString("/ip4/127.0.0.1/tcp/9000/p2p/<peer-id>")  err := host.Connect(ctx, *peerAddr) if err != nil { log.Printf("Ошибка подключения к узлу: %v", err) return }  stream, err := host.NewStream(ctx, peerAddr.ID, "/file-transfer/1.0.0") if err != nil { log.Printf("Ошибка создания потока: %v", err) return } defer stream.Close()  // Читаем файл из потока file, err := os.Create(hash + "_downloaded") if err != nil { log.Printf("Ошибка создания файла: %v", err) return } defer file.Close()  fmt.Println("Скачивание начато...") _, err = io.Copy(file, stream) if err != nil { log.Printf("Ошибка при загрузке файла: %v", err) return }  fmt.Println("Файл успешно скачан!") }

Пример работает так: все узлы подключаются к общей теме file-share и обмениваются информацией о файлах. Если узел хочет поделиться файлом, он отправляет сообщение с названием, размером и хэшем файла. Другой узел, заинтересованный в этом файле, находит владельца через PubSub, подключается к нему и скачивает файл напрямую через поток. Чтобы всё было круче, можно добавить распределённый поиск узлов через DHT, шифровать передачу данных и настроить репликацию файлов, чтобы они не потерялись, если кто-то из узлов отключится.

Подробнее с библиотекой можно ознакомиться здесь.


Напоследок напоминаю про открытые уроки, которые пройдут в Otus завтра (успевайте записаться!):

  • «Коммуникация между микросервисами с помощью RabbitMQ в ASP.NET». Подробнее

  • «Хранение данных в Kubernetes: Volumes, Storages, Stateful-приложения». Подробнее

Полный список открытых уроков по разработке, а также по всем IT-направлениям можно посмотреть в календаре мероприятий.


ссылка на оригинал статьи https://habr.com/ru/articles/876996/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *