Asterisk — это болид «Формулы-1», а не рейсовый автобус

от автора

Asterisk — фи, это же моветон

Здравствуйте уважаемые читатели этого замечательного ресурса. По уже сложившейся традиции — являюсь давним читателем habr’а, но только сейчас решил сделать пост. Что, собственно, побудило к написанию? Честно сказать, и сам не знаю. То ли притянутые статьи о производительности FreeSWITCH/Yate/3CX/etc в сравнении с Asterisk, то ли действительные, реальные проблемы архитектуры последнего, а, возможно, желание сделать что-нибудь уникальное.

И что удивительно, в первом случае, как правило, сравнивают мягкое и теплое, так сказать, FreeSWITCH/Yate/etc и FreePBX. Да-да, именно FreePBX. Это не опечатка. Причем интересно, что во всех сравнениях зачастую один Asterisk в дефолтной конфигурации. Ну, вы знаете, эта конфигурация — загруженные все имеющиеся модули, кривой диалплан (FreePBX как бы способствует) и куча остальной необъективщины. Что до родовых болячек Asterisk’а — да, объективно их вагон и маленькая тележка.

Что со всем этим делать? Разрушать стереотипы и исправлять родовые травмы. Этим и займемся.

Скрещиваем ежа с ужом

Многие из новичков испытывают дискомфорт, глядя на синтаксис описания диалплана в Asterisk’е, а некоторые на полном серьезе обосновывают выбор другого сервера телефонии именно необходимостью писать диалплан в том виде, в котором он есть по дефолту. Типа, перелопачивать многострочный XML — это верх комфорта. Да, есть возможность юзать LUA/AEL, и это хорошо. Но лично я отнес бы эту возможность в минусы и в частности то, что касается pbx_lua.

Как уже было сказано, иметь возможность описывать диалплан полноценным языком программирования — это хорошо. Проблема в том, что время жизни скрипта и его окружения равно времени жизни канала. Для каждого нового канала запускается свой экземпляр скрипта, следовательно, прощай, разделяемые между каналами переменные, единичная загрузка сторонних модулей, один разделяемый коннект к базе и т.д., и т.п. Строго говоря, от встраиваемого языка описания сценария этого и не нужно, но уж сильно хочется. А если хочется – значит, надо.

Итак, от классического Asterisk’а возьмем принципы pbx_lua, от Yate возьмем модель маршрутизации, а от FreeSWITCH ничего брать не будем, ибо "overhead" не нужен. ОК, с тем, что нам нужно родить, определились. Что же будем использовать для генетических экспериментов:

  • Asterisk, причем хотелось бы без привязок к версии. Тот же ARI был анонсирован, если мне не изменяет память, с 12-й версии. Если учесть, что до сих пор где то юзаются 1.8/1.6, а возможно и 1.4, то зависимость от версионных плюшек нам не нужна.
  • Lua — замечательный, гибкий и крайне функциональный скриптовый язык. Сам бог велел, так сказать, без комментариев.
  • Lunapark — интересный проект на github’е, своего рода сервер voip-приложений.

Про Lunapark стоит рассказать подробнее. Это сервер, реализующий потенциал AMI-протокола в связке с классическим FastAGI, что немаловажно в едином пространстве выполнения. То есть, получаем аналог ARI посредством тесной кооперации AGI и AMI в одном флаконе.

Предвижу логичный вопрос: для чего это все? Есть же Asterisk REST Interface, чей функционал ты тут пытаешься переизобрести! Ответ на этот вопрос неоднозначен. Согласен, ARI декларирует ряд преимуществ: да, он асинхронен, да, позволяет работать с "сырыми" примитивами, WebSockets и да, стильный, модный, молодежный XML/JSON — куда ж без него. Но, черт возьми, часть этих так называемых преимуществ крайне сомнительна и добавляет один, а то и более уровней абстракции. Другая же часть — вообще не преимущества. Преимущества — это когда что-то свойственно только тебе, ниже мы это увидим на примере той же асинхронности.

Как это работает? Стандартными средствами заворачиваем канал в FastAGI-приложение, внутри которого получаем возможность управлять звонком, как будто юзаем pbx_lua с незначительным изменением синтаксиса. Вишенкой на торте является возможность управлять состоянием самого Asterisk’а и окружением канала, для этого в распоряжении текущего FastAGI-приложения есть глобальный AMI-объект. Кстати, можно не заворачивать канал в FastAGI-приложение, а создать глобальный обработчик события, допустим, для NewChannel. А это уже преимущество по сравнению с ARI, там как известно, вне stasis’а ARI слеп.

Реализован Lunapark в лучших традициях кооперативной многозадачности, а именно всеми любимая асинхронность на сопрограммах. И как следствие отсутствие проблем с "shared data". То есть плюсы присутствуют, но и проблемы появляются. Одна из них — это необходимость описывать логику с оглядкой на асинхронность, но я думаю, это мы как-нибудь переживем.

Что дальше?

Синтаксис описания контекстов — а что же с ним не так? Да все с ним нормально, более того, практику написания диалплана нужно прописывать как профилактику для формирования структурированного мышления у новичков. Но, вместе с тем нужно понижать порог вхождения. Поэтому будем упрощать и в то же время добавлять функционала.

Простой пример:

[test] exten => _XXX/102,1,Hangup() exten => _XXX,1,Dial(SIP/${EXTEN})

В этом примере идет дозвон до трехзнака, кроме абонента 102. Вроде бы все логично и лаконично за исключением того, что шаблоны соответствия экстеншена ограничены небольшим набором правил, а так называемая extended маршрутизация возможна только по CallerID звонящего. А хотелось бы, к примеру, по CallerIDName или по текущему состоянию звонящего канала, а возможно по имени самого канала, а если реализовать полноценный regexp, так вообще красота. И да, я знаю, все эти хотелки можно реализовать, расписав контекст в таком виде:

[test] exten => _XXX/102,1,Hangup()  ; по CallerIDName exten => _XXX,1,ExecIf($[ "${CALLERID(name)}" == "Vasya" ]?Hangup())  ; По состоянию канала exten => _XXX,n,ExecIf($[ "${CHANNEL(state)}" != "Ring" ]?Hangup())  ; По имени канала exten => _XXX,n,ExecIf($[ "${CUT(CUT(CHANNEL,-,1),/,2)}" == "333" ]?Hangup())  exten => _XXX,n,Dial(SIP/${EXTEN})

Но мой внутренний перфекционист начинает бунтовать при виде такого, а если представить аналогичную выборку по всем пользователям, да еще и действия нужны разные и посложнее Hangup’а, то extensions.conf превращается в длииинную портянку вызовов Goto, GoSub, Macro и, не дай бог, с каналами типа Local.

Выход один — прикручивать свои правила маршрутизации с подкидным и дамами с низкой социальной ответственностью.

В качестве примера:

${Exten}:match('%d%d%d')            and  (   ${CallerIDNum}:match('201') or    ${CallerIDName}:match('Vasya') or    ${State}:lower() ~= 'ring' or    ${Channel}:match('^[^/]+/([^%-]+)') == '333' ) => Hangup();  ${Exten}:match('%d%d%d') => Dial {callee = ('SIP/%s'):format(${Exten})};

Хм, вырвиглазненько получилось, но на удивление читается и понимается с первого взгляда. А самое главное, что у нас появился аналог regexp’ов и группировка правил на действие, что, несомненно, упростит составление маршрутов в будущем.

Что тут думать, прыгать надо.

В итоге имеем Lunapark как замену pbx_lua. Его средствами нам и нужно создать логику обработки нашей модели маршрутизации. Для начала нужно распарсить набор правил и заменить все вхождения ${...} на соответствующие им значения, то есть привести к виду ('...'). Значения будут браться из окружения текущего канала.
Затем приводим каждое правило к виду условного оператора, чтобы получить нечто похожее:

-- Exten = 123 -- Sate = Ring -- CallerIDNum = 100 -- CallerIDName = Test -- Channel = SIP/100-00000012c  if ('123'):match('%d%d%d') and (   ('100'):match('201') or   ('Test'):match('Vasya') or   ('Ring'):lower() ~= 'ring' or   ('SIP/100-00000012c'):match('^[^/]+/([^%-]+)') == '333' ) then   Hangup() end  if ('123'):match('%d%d%d') then   Dial {callee = ('SIP/%s'):format(('123'))} end

Делать это будут две функции fmt и syntax соответственно:

local fmt = function(str, tab)  return (str:gsub('(%${[^}{]+})', function(w)   local mark = w:sub(3, -2)     return (mark:gsub('(.+)',function(v)    local out = tab[v] or v     return ("('%s')"):format(out)   end))  end)) end  local syntax = function(str)  return (str:gsub('([^;]+)=>([^;]+)',function(p,r)   return ([[     if %s then     %s    end   ]]):format(p,r)  end)) end

В принципе ничего сложного, все просто и понятно. Идем дальше — считывать наши правила будем из файла в переменную при старте сервиса, а парсить их уже при звонке с актуальным окружением. Считыванием правил займется функция routes.

local routes = function(...)   local conf, content = ...    local f, err = io.open(conf, "r")    if io.type(f) ~= 'file' then    log.warn(err)  -- Глобальный LOG объект доступный благодаря Lunapark'у    return ""   else    content = f:read('*all')   end    f:close() return content end

Осталось сделать две вещи: завернуть звонки в Lunapark и соответственно их обработать с учетом наших маршрутов. Тут стоит немного пояснить такой момент — в Lunapark вся логика описывается в handler’е. Это текстовый файл, в котором мы будем определять наши FastAGI-приложения и работать с AMI и нашими маршрутами.

Как уже было сказано, объект AMI — глобальный и, помимо роли AMI-клиента, может устанавливать своего рода слушатели для конкретных AMI событий. Этим мы и воспользуемся, но для начала сделаем некоторые приготовления в extensions.conf.

[default] exten => _[hit],1,NoOp() exten => _.,n,Wait(5)  exten => _.,1,AGI(agi://127.0.0.1/${EXTEN}${IF($[ "X${PRMS}" != "X" ]?"?${PRMS}")})

Wait(5) в примере выше позволит нам не обрывать канал при завершении FastAGI-приложения, так как в маршрутах может быть описано несколько приложений, а выполнение их осуществляется по средствам Redirect на контекст default по ${EXTEN}.

Таким образом, беря во внимание все выше описанное и помня о кооперативной природе Lunapark’а, попробуем закодить логику обработки маршрутов через FastAGI-приложения.

-- Считываем наши правила в переменную rules local rules = routes('routes.conf') -- Очищаем все обработчики, таким образом очистятся только не именные обработчики -- Это даст возможность не затирать цепочку выполнения при сигналах HUP/QUIT ami.removeEvents('*') -- Обработчик события создания нового канала ami.addEvents {  ['newchannel'] = function(e)   -- Условия, только каналы с набором и каналы с контекстом users   if (e['Context'] and e['Context']:match('users')) and e['Exten'] then    -- Переменная, указывающая на выполнении, какого FastAGI приложения мы находимся    local step    -- Будущий порядковый номер FatsAGI приложения в цепочке выполнения    local count = 0    -- Парсим маршруты для текущего окружения канала    local code, err = loadstring(syntax(fmt(rules,e)))     -- В описании маршрутов нет ошибок, двигаемся дальше    if type(code) == 'function' then     -- Проксируем будущие FastAGI приложения      setfenv(code,setmetatable({indexes = {}},{__index = function(t,k)      -- Вот они последствия кооперативности      return coroutine.wrap(       function(...)        local prms = {} -- Будущие параметры FastAGI приложения        local owner = t -- Копия окружения        local event = e -- Копия таблицы event        local thread = coroutine.running() -- ID текущей сопрограммы         -- Парсим параметры и приводим к виду URI        for p,v in pairs({...}) do         if type(v) == 'table' then          for key, val in pairs(v) do           table.insert(prms,("%s=%s"):format(key,val))          end         else          table.insert(prms,("%s=%s"):format(p,v))         end        end        -- Если это не первое FastAGI приложение в цепочке        if step then         -- Запоминаем предыдущее перед этим         local last = ("%s"):format(step)         -- Добавляем ИМЕННЫЕ обработчики события UserEvent по доп. условиям         -- И записываем в таблицу indexes(в окружении) их порядковые номера         -- Именные обработчики требуют последующего удаления самостоятельно         table.insert(owner['indexes'],ami.addEvent('UserEvent',function(evt)          -- Ловим событие AGIStatus указывающее на завершение приложения          -- Если это предыдущее перед нами, пробуждаем сопрограмму          if (evt['Channel'] and evt['Channel'] == event['Channel'])                and           (evt['UserEvent'] and evt['UserEvent']:match('AGIStatus'))                and           (evt['Script'] and evt['Script'] == last)          then           -- Соответствие порядкового номера нашей сопрограмме           -- В цепочке может быть вызов одного приложения несколько раз           -- Это позволит выполнять сопрограммы в порядке их определения           if owner['indexes'][count] == thread then            if coroutine.status(thread) ~= 'dead' then             coroutine.resume(thread)            end           end          end         end,thread))         -- Устанавливаем маркер текущего FastAGI приложения         step = k         -- Приостанавливаем сопрограмму         coroutine.yield()        else -- Здесь обрабатывается первое FastAGI приложение в цепочке         local index -- Индекс для обработчика Hangup события         -- Устанавливаем маркер текущего FastAGI приложения         step = k         -- Добавляем ИМЕННОЙ обработчик события Hangup для канала         -- В этом месте подчищаем за собой         index = ami.addEvent('Hangup',function(evt)          if evt['Channel'] and evt['Channel'] == event['Channel'] then           -- Удаляем обработчик событие Hangup по ранее запомненному индексу           ami.removeEvent('Hangup',index)           -- Удаляем все обработчики цепочек выполнения по индексу           for _,v in pairs(owner['indexes']) do            ami.removeEvent('UserEvent',v)           end           -- Делаем приятно сборщику мусора           owner = nil          end         end,thread)        end        -- По средствам AMI выставляем переменную для канала и вызова в цепочке        ami.setvar{         Value = table.concat(prms,'&'),         Channel = event['Channel'],         Variable = 'PRMS'        }        -- Перенаправляем канал на AGI-приложение через контекст default        ami.redirect{         Exten = k,         Priority = 1,         Channel = event['Channel'],         Context = 'default'        }        -- Выставляем индекс приложения        count = count + 1       end)     end}))()    else     -- Если что-то пошло не так     log.warn(err)    end   end  end }

Для тех, кто не смотрел или не понял код, поясню простым языком, что же мы там накодили. Для каждого действия в маршруте создается сопрограмма с копированием всего, чего нужно через замыкания и в приостановленном состоянии. Кроме сопрограммы первого действия, она выполняется как есть. Приостанавливаем мы их для эмуляции последовательного выполнения, то есть строго одна за другой, вторая после первой, третья после второй и т.д.

Стоп, так они и так друг за другом выполняются. На самом деле нет — если не эмулировать синхронность, то асинхронный redirect пробежится по каждому действию и займет это доли секунды. В нашем же коде мы выполняем каждое действие, по наступлению определенного события, а именно завершении предыдущего FastAGI-приложения. Lunapark заботливо генерирует специальный UserEvent по окончании выполнения каждого FastAGI-приложения с соответствующими параметрами — вот на это событие и ориентируемся. Сами же сопрограммы просто редиректят текущий канал в контекст default с экстеншном, равным текущему действию, предварительно установив переменную канала PRMS.

Самое интересное, что звонок после redirect’а придет опять в handler, но уже в контексте выполнения AGI и на соответствующее приложение. В нашем случае это Hangup() и Dial(). Давайте же напишем их для полноты повествования.

function Hangup(...)   local app, channel = ... -- В этом отличие от pbx_lua    app.verbose(('The Channel %s does not match by routing rules'):format(channel.get('CHANNEL')))   app.hangup() end  function Dial(...)   local app, channel = ...   local leg = app.agi.params['callee'] or ''    app.verbose(('Trying to make a call from %s to %s'):format(    channel.get('CALLERID(num)'),    leg:match('^[^/]+/([^%-]+)'))   )   app.dial(leg) end

Ну, вот и все — допрыгались

Итак, давайте подытожим. Что же мы получили в результате этих генетических экспериментов?

  • гибкий, функциональный подход описания маршрутов;
  • возможность создания полнофункциональных VoIP-приложений. Теперь нам не нужно приложение Queue, мы можем его сами написать, не заморачиваясь с созданием своего собственного модуля для asterisk’а;
  • вынесли логику формирования, управления звонками на сторону сервера VoIP-приложений, тем самым сделав из asterisk’а Mediahub, что позволило повысить производительность VoIP-системы в целом;
  • возможность использовать достаточно простой, расширяемый и очень гибкий скриптовый язык для создания VoIP-приложений;
  • расширили возможности интеграции с внешними системами из VoIP-приложений.

Кому как, а мне пока все нравится.

handler целиком

local fmt = function(str, tab)  return (str:gsub('(%${[^}{]+})', function(w)   local mark = w:sub(3, -2)     return (mark:gsub('(.+)',function(v)    local out = tab[v] or v     return ("('%s')"):format(out)   end))  end)) end  local syntax = function(str)  return (str:gsub('([^;]+)=>([^;]+)',function(p,r)   return ([[     if %s then     %s    end   ]]):format(p,r)  end)) end  local routes = function(...)   local conf, content = ...    local f, err = io.open(conf, "r")    if io.type(f) ~= 'file' then    log.warn(err)  -- Глобальный LOG объект доступный благодаря Lunapark'у    return ""   else    content = f:read('*all')   end    f:close() return content end  -- Считываем наши правила в переменную rules local rules = routes('routes.conf') -- Очищаем все обработчики, причем таким образом очистятся только неименные обработчики событий -- Это даст возможность не затирать цепочку выполнения при сигналах HUP/QUIT ami.removeEvents('*') -- Обработчик события создания нового канала ami.addEvents {  ['newchannel'] = function(e)   -- Условия, только каналы с набором и каналы с контекстом users   if (e['Context'] and e['Context']:match('users')) and e['Exten'] then    local step -- Переменная, указывающая на выполнении, какого FastAGI приложения мы находимся    local count = 0 -- Будущий порядковый номер FatsAGI приложения в цепочке выполнения    -- Парсим маршруты для текущего окружения канала    local code, err = loadstring(syntax(fmt(rules,e)))     -- В описании маршрутов нет ошибок, двигаемся дальше    if type(code) == 'function' then     -- Проксируем будущие FastAGI приложения      setfenv(code,setmetatable({indexes = {}},{__index = function(t,k)      -- Вот они последствия кооперативности      return coroutine.wrap(       function(...)        local prms = {} -- Будущие параметры FastAGI приложения        local owner = t -- Копия окружения        local event = e -- Копия таблицы event        local thread = coroutine.running() -- ID текущей сопрограммы         -- Парсим параметры и приводим к виду URI        for p,v in pairs({...}) do         if type(v) == 'table' then          for key, val in pairs(v) do           table.insert(prms,("%s=%s"):format(key,val))          end         else          table.insert(prms,("%s=%s"):format(p,v))         end        end        -- Если это не первое FastAGI приложение в цепочке        if step then         -- Запоминаем предыдущее перед этим         local last = ("%s"):format(step)         -- Добавляем ИМЕННЫЕ обработчики события UserEvent по доп. условиям         -- И записываем в таблицу indexes(в окружении) их порядковые номера         -- Именные обработчики требуют последующего удаления самостоятельно         table.insert(owner['indexes'],ami.addEvent('UserEvent',function(evt)          -- Ловим событие AGIStatus указывающее на завершение приложения          -- Если это предыдущее перед нами, пробуждаем сопрограмму          if (evt['Channel'] and evt['Channel'] == event['Channel'])                and           (evt['UserEvent'] and evt['UserEvent']:match('AGIStatus'))                and           (evt['Script'] and evt['Script'] == last)          then           -- Соответствие порядкового номера нашей сопрограмме           -- В цепочке может быть вызов одного приложения несколько раз           -- Это позволит выполнять сопрограммы в порядке их определения           if owner['indexes'][count] == thread then            if coroutine.status(thread) ~= 'dead' then             coroutine.resume(thread)            end           end          end         end,thread))         -- Устанавливаем маркер текущего FastAGI приложения         step = k         -- Приостанавливаем сопрограмму         coroutine.yield()        else -- Здесь обрабатывается первое FastAGI приложение в цепочке         local index -- Индекс для обработчика Hangup события         -- Устанавливаем маркер текущего FastAGI приложения         step = k         -- Добавляем ИМЕННОЙ обработчик события Hangup для канала         -- В этом месте подчищаем за собой         index = ami.addEvent('Hangup',function(evt)          if evt['Channel'] and evt['Channel'] == event['Channel'] then           -- Удаляем обработчик событие Hangup по ранее запомненному индексу           ami.removeEvent('Hangup',index)           -- Удаляем все обработчики цепочек выполнения по индексу           for _,v in pairs(owner['indexes']) do            ami.removeEvent('UserEvent',v)           end           -- Делаем приятно сборщику мусора           owner = nil          end         end,thread)        end        -- По средствам AMI выставляем переменную для канала и вызова в цепочке        ami.setvar{         Value = table.concat(prms,'&'),         Channel = event['Channel'],         Variable = 'PRMS'        }        -- Перенаправляем канал на AGI-приложение через контекст default        ami.redirect{         Exten = k,         Priority = 1,         Channel = event['Channel'],         Context = 'default'        }        -- Выставляем индекс приложения        count = count + 1       end)     end}))()    else     -- Если что-то пошло не так     log.warn(err)    end   end  end }  function Hangup(...)   local app, channel = ... -- В этом отличие от pbx_lua    app.verbose(('The Channel %s does not match by routing rules'):format(channel.get('CHANNEL')))   app.hangup()  end  function Dial(...)   local app, channel = ...   local leg = app.agi.params['callee'] or ''    app.verbose(('Trying to make a call from %s to %s'):format(    channel.get('CALLERID(num)'),    leg:match('^[^/]+/([^%-]+)'))   )   app.dial(leg)  end

ссылка на оригинал статьи https://habr.com/ru/post/513572/


Комментарии

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *