Кастомный сервер конвертации файлов для Битрикс24

от автора

Пару слов обо мне

Меня зовут Дмитрий, я являюсь PHP разработчиком. Работаю с Битрикс24, Laravel и Go.

Проблема Битрикса

Как известно, рекомендуемое окружение для Битрикса – их собственная разработка BitrixVM на базе CentOS. Иногда такое окружение не устраивает заказчиков, поэтому выбирают Docker или сервер с установленным LEMP стеком. 

При переходе на окружение отличное от BitrixVM, существует две основные проблемы – отсутствие сервера очередей Push&Pull и сервера конвертации файлов. 

Первая проблема решаема: на просторах гитхаба уже существует рабочее решение для развертывания локального сервера в Docker. А также можно использовать облачный сервер, так как для его работы не требуется, чтобы портал был доступен извне. 

Со второй проблемой облако уже не всегда возможно использовать по ряду причин:

  • необходимость доступности сайта извне (не подходит для полностью закрытых окружений);

  • опасения заказчиков по поводу передачи конфиденциальных документов на облачный сервер.

Единственное решение – установка BitrixVM на отдельном сервере/в докере, с развертыванием бэкапа внутри и использование штатного сервера, встроенного в окружение, что далеко не всегда удобно.

С этим сталкивался и я на закрытых окружениях крупных российских заказчиков, которые далеко не всегда соглашались открывать доступ к порталу из-за соображений безопасности.

После нескольких таких кейсов было решено написать свой полноценный аналог, который бы отвечал нескольким критериям:

  • Работал в докере и запускался в несколько кликов;

  • Была возможность легко задать количество воркеров под различную нагрузку.

В качестве языка программирования было решено выбрать Go.

Реализация

Для начала нам необходимо понять, что представляет собой штатный сервер конвертации: по сути, это не более чем:

  • API сервер с одним эндпоинтом;

  • RabbitMQ сервер;

  • Consumer для обработки заданий из очереди, со следующим установленным ПО: LibreOffice, MPEG, ImageMagic.

Посмотреть реализацию на PHP можно в исходниках модуля Битрикса. Для упрощения я покажу только основной код, остальное всегда можно будет посмотреть в репозитории.

API сервер

Для API мы будем использовать библиотеку go-chi/chi.

Реализуем единственный обработчик – convert, с преобразованием данных в структуру ConvertTask и последующим добавлением в одну из очередей:

  • main_preview – используется в предпросмотре файлов;

  • documentgenerator_create – используется в генераторе документов CRM.

// internal/http-server/handlers/convert/convert.go func New(ctx context.Context, log *slog.Logger, rabbit *rabbitmq.Rabbit) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) {  const op = "handlers.convert.New"  reqId := middleware.GetReqID(r.Context())  log = log.With( slog.String("op", op), slog.String("request_id", reqId), )  err := r.ParseForm()  if err != nil { log.Error("failed to decode request body", sl.Err(err)) render.JSON(w, r, resp.Error("failed to decode request body", 152)) }  task, err := prepareOptions(r.Form, reqId)  if err != nil { log.Error("failed to prepare options", sl.Err(err)) render.JSON(w, r, resp.Error("failed to parse task", 0)) return }  if task.Queue == "" { task.Queue = rabbit.DefaultQueue() log.Warn("not found queue. Set default", slog.String("default_queue", task.Queue)) }  taskMsg, err := json.Marshal(task)  if err != nil { log.Error("Error parse request", sl.Err(err))  render.JSON(w, r, resp.Error("Error parse request", 0))  return }  err = rabbit.Publish(task.Queue, taskMsg) if err != nil { log.Error("error publish task", slog.String("queue", task.Queue), sl.Err(err)) render.JSON(w, r, resp.Error("error publish task", 0)) } render.JSON(w, r, resp.Success()) } }

FileUploader 

Создадим структуру FileUploader и реализуем следующие базовые методы:

  • Download – скачивание конвертируемого файла из Б24;

  • uploadFile – загрузка готового файла в Б24;

  • Complete – отправка запроса о завершении конвертации;

  • getUploadInfo – получение информации о загружаемом файле.

    Для каждого запроса заводим структуры. Оборачиваем запросы в функцию retry.Do из библиотеки avast/retry-go. Устанавливаем 3 попытки на подключение (на случай сетевых проблем).

// internal/lib/fileuploader/fileuploader.go Пример метода получения информации о файле func (f *FileUploader) getUploadInfo(file string, key string) (*uploadInfoResp, error) { fileInfo, err := os.Stat(file)  if err != nil { return nil, fmt.Errorf("error get file info [%s]: [%w]", file, err) }  uploadReq := uploadInfoRequest{ FileId:   key, FileSize: fileInfo.Size(), Upload:   "where", }  v, err := query.Values(uploadReq)  if err != nil { return nil, fmt.Errorf("error convert struct request to query: [%w]", err) }  res, err := http.PostForm(f.url, v)  if err != nil { return nil, fmt.Errorf("error get upload info from [%s]: [%w]", f.url, err) }  var uploadInfoRes uploadInfoResp  body, err := io.ReadAll(res.Body)  if err != nil { return nil, fmt.Errorf("wrong response upload info request to url [%s]: [%w]", f.url, err) }  if err = json.Unmarshal(body, &uploadInfoRes); err != nil { return nil, fmt.Errorf("error unmarshal upload info request to url [%s]: [%w]", f.url, err) }  return &uploadInfoRes, nil } 

Клиент RabbitMQ

Создаем структуру RabbitMQ и реализуем базовые методы:

  • Connect — подключение к rabbitMQ;

  • Reconnect — реконнект в случае разрыва соединения, запускаем в отдельной горутине;

  • InitQueue — инициализация основной очереди и Dead Letter;

  • Consume — чтение сообщений из очереди;

  • Publish — отправка сообщения в очередь.

// internal/lib/rabbitmq/rabbitmq.go package rabbitmq  import ( "bitrix-converter/internal/config" "bitrix-converter/internal/lib/logger/sl" "context" "fmt" amqp "github.com/rabbitmq/amqp091-go" "log/slog" "time" )  type Rabbit struct { conn *amqp.Connection log  *slog.Logger cfg  config.RabbitConfig }  func New(log *slog.Logger, cfg config.RabbitConfig) *Rabbit { return &Rabbit{ log: log, cfg: cfg, } }  func (r *Rabbit) DefaultQueue() string { return r.cfg.DefaultQueue }  func (r *Rabbit) Connect() error { url := fmt.Sprintf("amqp://%s:%s@%s:%s", r.cfg.User, r.cfg.Password, r.cfg.Host, r.cfg.Port)  conn, err := amqp.Dial(url) if err != nil { return fmt.Errorf("failed to connect to RabbitMQ: [%w]", err) }  r.conn = conn return nil }  func (r *Rabbit) Channel() (*amqp.Channel, error) { return r.conn.Channel() }  func (r *Rabbit) Reconnect() { for { _, ok := <-r.conn.NotifyClose(make(chan *amqp.Error)) if !ok { r.log.Error("failed notifying rabbitMQ channel. Reconnecting...") } r.log.Error("rabbitmq connection closed unexpectedly. Reconnecting...")  for {  err := r.Connect()  if err == nil { r.log.Info("rabbitMQ reconnect success") break }  r.log.Error("rabbitmq reconnect failed. Retry after 10 seconds", sl.Err(err)) time.Sleep(10 * time.Second) }  } }  func (r *Rabbit) InitQueue(ch *amqp.Channel, queue string) error {  dlQueue := queue + "_dead"  _, err := ch.QueueDeclare( dlQueue, true, false, false, false, amqp.Table{}, )  if err != nil { return fmt.Errorf("failed to declare dead letter queue: [%w]", err) }  _, err = ch.QueueDeclare( queue, true, false, false, false, amqp.Table{ "x-dead-letter-exchange":    "", "x-dead-letter-routing-key": dlQueue, "x-message-ttl":             60480000, }, )  if err != nil { return fmt.Errorf("failed to declare queue: [%w]", err) } return nil }  func (r *Rabbit) Consume(ch *amqp.Channel, queue string) (msgs <-chan amqp.Delivery, err error) { msgs, err = ch.Consume( queue, "", false, false, false, false, nil, ) if err != nil { return nil, fmt.Errorf("failed to consume queue: [%w]", err) } return msgs, nil }  func (r *Rabbit) Publish(queue string, message []byte) error { ch, err := r.Channel() if err != nil { return fmt.Errorf("failed to open channel: [%w]", err) }  defer ch.Close()  ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel()  err = ch.PublishWithContext( ctx, "", queue, false, false, amqp.Publishing{ ContentType: "text/plain", Body:        message, }, )  if err != nil { return fmt.Errorf("failed to publish message: [%w]", err) } return nil }  func (r *Rabbit) Connection() *amqp.Connection { return r.conn } 

Command

Создадим BaseCommand с общим методом Execute и две реализации: DocumentCommand (LibreOffice) и VideoCommand (MPEG).

// internal/lib/command/command.go func (bs *BaseCommand) Execute() error {  if err := bs.validate(); err != nil { return fmt.Errorf("failed validate transform task: [%w]", err) }  directory := bs.DownloadDir()  err := os.MkdirAll(directory, 0755)  if err != nil { return fmt.Errorf("error creating directory [%s]: [%w]", directory, err) }  filePath := bs.genTmpFilePath(directory)  err = retry.Do( func() error { return bs.uploader.Download(bs.task.File, filePath, bs.MaxSize()) }, retry.Attempts(3), retry.OnRetry(func(n uint, err error) { time.Sleep(1 * time.Second) }), )  bs.uploader.AddFileToDelete(filePath)  defer bs.uploader.DeleteFiles()  if err != nil { return fmt.Errorf("error download file [%s]: [%w]", bs.task.File, err) }  bs.file = filePath  for _, format := range bs.task.Formats {  if _, ok := bs.files[format]; ok { continue } pre, err := bs.preConvert(format, filePath) if err != nil { return err } if pre { continue }  convertedFile, err := bs.transform(format, filePath) bs.uploader.AddFileToDelete(convertedFile) if err != nil { return fmt.Errorf("error transform file [%s] to [%s]: [%w]", bs.task.File, format, err) } bs.files[format] = convertedFile }  bs.uploader.SetFiles(bs.files)  err = bs.uploader.UploadFiles() if err != nil { return fmt.Errorf("error uploading files: [%w]", err) }  err = bs.uploader.Complete() if err != nil { return fmt.Errorf("failed complete: [%w]", err) } return nil }

Consumer

В текущей реализации запускаем 3 горутины на каждую очередь, в одном контейнере.

Настроим graceful завершение (с таймаутом в 5 минут) и реконнект к rabbitmq (в случае разрыва соединения обработчики будут подключаться заново и после восстановления соединения продолжат работу в штатном режиме)

// cmd/consumer/main.go func main() {  cfg := config.MustLoad()  logger := sl.SetupLogger(cfg.Env)  rabbit := rabbitmq.New(logger, cfg.Rabbit)  conErr := rabbit.Connect() if conErr != nil { log.Fatalf("failed connect to RabbitMQ with start %v", conErr) }  go rabbit.Reconnect()  cancelCtx, cancel := context.WithCancel(context.Background()) wg := &sync.WaitGroup{} for i := 0; i <= 3; i++ { for _, queue := range queues {  go func() { uniqId := fmt.Sprintf("%s_%d", queue, i) logger.Info("start consumer", slog.String("queue", queue)) wg.Add(1) done: for { time.Sleep(10 * time.Second) ch, err := rabbit.Channel() if err != nil { logger.Error("failed to open channel. Retry", slog.String("queue", queue), sl.Err(err)) continue } defer ch.Close()  err = rabbit.InitQueue(ch, queue)  if err != nil { logger.Error("failed init queue. Retry", slog.String("queue", queue), sl.Err(err)) continue }  logger.Info("success init queue", slog.String("queue", queue)) msgs, err := rabbit.Consume(ch, queue)  if err != nil { logger.Error("failed consume. Retry", slog.String("queue", queue), sl.Err(err)) continue }  logger.Info("success consume. Waiting messages", slog.String("queue", queue)) closed: for { select { case <-cancelCtx.Done(): break done default: } select { case d, ok := <-msgs: if !ok { logger.Info("channel closed", slog.String("queue", queue)) break closed } handleMessage(d, logger, cfg, uniqId) default: }                         time.Sleep(1 * time.Second) } } wg.Done() }()  } } waitCh := make(chan struct{}) ch := make(chan os.Signal) signal.Notify(ch, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT, syscall.SIGKILL)  <-ch logger.Info("receive a shutdown signal") go func() { logger.Info("cancel, wait consumer") cancel() wg.Wait() close(waitCh) }()  select { case <-waitCh: logger.Info("graceful shutdown") case <-time.After(5 * time.Minute): logger.Info("shutdown before 5 minutes timeout") }  } 

Логирование

Для логирования используем библиотеку slog, для группировки логов используем параметр request_id.

Dead Letter Queue

Для каждой очереди инициализируем одноимённую очередь с префиксом dead.

В случае ошибки конвертации делаем reject, и задание попадает в dead очередь, для последующего анализа. По умолчанию время жизни сообщения — неделя.

Где посмотреть?

Сервер разместил на GitHub, текущая версия 1.0.0. В дальнейшем возможно будет обновляться. Инструкция по развертыванию прилагается.

Итого

Надеюсь, это решение упростит отказ от CentOS там, где это необходимо. При возникновении проблем создавайте issue на GitHub.


ссылка на оригинал статьи https://habr.com/ru/articles/912446/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *