Самодельный DDNS

от автора

Продолжаю развивать свой домашний сервачок, для удобного добавления сервисов понадобились поддомены . Так как за статический IP своему провайдеру я платить не хочу, то я использовал DDNS от TP-Link. И адрес выглядел https://my-adress.tplinkdns.com. TP-Link даёт только один поддомен и поддомены 2-го уровня создать нет возможности.

Поэтому думал использовать ddclient и Cloudfare. Выяснилось что Cloudfare больше не работает с ru зоной. Вот расисты!

Поискал хостеров с API для управления DNS-записями. Какое моё было удивление, что сейчас регистраторы требуют столько много за домен. Ещё и вводят в заблуждения всяческими способами. Первый год берут немного, а потом тысячами. Например REG.ru за домен на ru берёт в первый год 169 рублей, типо по акции и со скидкой 25%. А стоимость продления всячески скрывается, только в дебрях документации можно найти, что они возьмут 1424 рубля за следующие годы.

Хостер, которым я пользуюсь с 2014, не вводил меня в заблуждение. И я раньше платил за ru зону 99 рублей, шло время и сейчас мне обходится 299 рублей. Правда я нахожусь на архивном тарифе, которого давно уже нет.

В общем выбор пал на Beget. Прозрачные условия и я уже пользовался их впсками. Купил у них доменное имя в ru зоне за 199 рублей, продление будет стоить 420. Зарегал поддомены для своих текущих сервисов. На домене прописал A-запись, указал свой внешний IP. В поддоменах прописал CNAME, ведущий на основной домен.

У меня на сервере будет запускаться скрипт по таймеру, проверять внешний IP. Если он изменился, то менять A-запись у домена, через API Beget. Особенность апишки, при обновлении DNS записи обновляется весь набор записей для FQDN. Поэтому скрипт сначала получает данные о записи, а потом только обновляет её.

В скрипте есть логгирование и ротация логов производится этим же скриптом.

Переменные окружения скрипта: Обязательные:

  • BEGET_LOGIN — логин Beget API

  • BEGET_PASSWORD — пароль Beget API

  • BEGET_FQDN — корневой домен, например my-adress.ru Опциональные:

  • BEGET_A_PRIORITY — приоритет A (по умолчанию 10)

  • STATE_FILE — файл, куда запоминать последний IP (по умолчанию .state/last_ip.txt рядом со скриптом)

  • IP_URL — URL, который возвращает внешний IP в тексте (по умолчанию https://api.ipify.org) Логи:

  • LOG_FILE — путь к файлу лога (по умолчанию logs/beget-ddns.log рядом со скриптом)

  • LOG_MAX_BYTES — размер файла лога до ротации (по умолчанию 1_000_000)

  • LOG_BACKUP_COUNT — сколько ротированных файлов хранить (по умолчанию 5)

  • LOG_LEVEL — уровень логирования (INFO, DEBUG, …; по умолчанию INFO)

#!/usr/bin/env python3from __future__ import annotationsimport jsonimport loggingimport logging.handlersimport osimport reimport sysimport timeimport urllib.parseimport urllib.requestfrom pathlib import Pathfrom typing import Any, Dict, List, TupleBEGET_API_BASE = "https://api.beget.com/api/dns"LOG = logging.getLogger("beget-ddns")def _setup_logging() -> None:    script_dir = Path(__file__).resolve().parent    log_file = Path(os.environ.get("LOG_FILE", str(script_dir / "logs" / "beget-ddns.log"))).expanduser()    log_file.parent.mkdir(parents=True, exist_ok=True)    max_bytes = int(os.environ.get("LOG_MAX_BYTES", str(1_000_000)))    backup_count = int(os.environ.get("LOG_BACKUP_COUNT", str(5)))    level_name = os.environ.get("LOG_LEVEL", "INFO").upper()    level = getattr(logging, level_name, logging.INFO)    LOG.setLevel(level)    handler = logging.handlers.RotatingFileHandler(        log_file,        maxBytes=max_bytes,        backupCount=backup_count,        encoding="utf-8",    )    formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")    handler.setFormatter(formatter)    LOG.addHandler(handler)    # Also emit to stdout (useful under systemd/journal).    stream = logging.StreamHandler(sys.stdout)    stream.setFormatter(formatter)    LOG.addHandler(stream)def _env_required(name: str) -> str:    value = os.environ.get(name)    if not value:        raise RuntimeError(f"Missing required env var: {name}")    return valuedef _http_get_json(url: str, timeout_s: int = 20) -> Any:    req = urllib.request.Request(url, headers={"User-Agent": "jhon-mosk-beget-ddns/1.0"})    with urllib.request.urlopen(req, timeout=timeout_s) as resp:        body = resp.read()    return json.loads(body.decode("utf-8"))def _http_get_text(url: str, timeout_s: int = 20) -> str:    req = urllib.request.Request(url, headers={"User-Agent": "jhon-mosk-beget-ddns/1.0"})    with urllib.request.urlopen(req, timeout=timeout_s) as resp:        body = resp.read()    return body.decode("utf-8").strip()def _beget_call(method: str, login: str, password: str, input_data: Dict[str, Any]) -> Any:    query = {        "login": login,        "passwd": password,        "input_format": "json",        "output_format": "json",        "input_data": json.dumps(input_data, ensure_ascii=False, separators=(",", ":")),    }    url = f"{BEGET_API_BASE}/{method}?{urllib.parse.urlencode(query)}"    payload = _http_get_json(url)    dump_path = os.environ.get("DEBUG_DUMP_JSON")    if dump_path:        try:            Path(dump_path).expanduser().write_text(                json.dumps(payload, ensure_ascii=False, indent=2) + "\n",                encoding="utf-8",            )        except Exception as e:            LOG.warning("failed to write DEBUG_DUMP_JSON=%s: %s", dump_path, e)    return payloaddef _unwrap_beget_payload(payload: Any) -> Any:    """    Beget API responses are not consistently documented.    We try to normalize the response by unwrapping common envelopes:    - {"answer": {...}} (with status, errors, and/or result/data)    - {"answer": {"status": "error", "errors": [...]}} -> raise a helpful error    """    if isinstance(payload, dict) and "answer" in payload:        answer = payload["answer"]        if isinstance(answer, dict):            status = answer.get("status")            if status == "error":                errors = answer.get("errors") or []                # best-effort extract                err_texts: List[str] = []                if isinstance(errors, list):                    for e in errors:                        if isinstance(e, dict):                            t = e.get("error_text") or e.get("text") or e.get("message")                            if isinstance(t, dict):                                t = t.get("text") or t.get("type") or str(t)                            if t:                                err_texts.append(str(t))                        elif isinstance(e, str):                            err_texts.append(e)                msg = "; ".join(err_texts) or repr(errors) or "unknown error"                raise RuntimeError(f"Beget API error: {msg}")            # Success case: sometimes data is nested            for key in ("result", "data"):                if key in answer:                    return answer[key]        return answer    return payloaddef _extract_records_from_getdata(payload: Any) -> Dict[str, List[Dict[str, Any]]]:    """    Beget getData docs show an "array" response with a dict-like content. In practice    the API may return either:    - a list with one object/dict inside, or    - a dict directly.    We normalize to the records dict.    """    payload = _unwrap_beget_payload(payload)    if isinstance(payload, list) and payload:        obj = payload[0]    else:        obj = payload    if not isinstance(obj, dict):        raise RuntimeError(f"Unexpected getData response type: {type(obj)}")    # Some shapes may nest actual object under known keys.    for key in ("result", "data"):        if key in obj and isinstance(obj[key], list) and obj[key]:            candidate = obj[key][0]            if isinstance(candidate, dict):                obj = candidate                break    records = obj.get("records")    if not isinstance(records, dict):        LOG.debug("getData raw object keys: %s", sorted(obj.keys()))        raise RuntimeError("getData response has no 'records' dict")    out: Dict[str, List[Dict[str, Any]]] = {}    for key in ("A", "MX", "TXT"):        items = records.get(key, [])        if items is None:            items = []        if not isinstance(items, list):            raise RuntimeError(f"getData records.{key} is not a list")        out[key] = items    return outdef _normalize_mx_txt_from_getdata(records: Dict[str, List[Dict[str, Any]]]) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:    """    getData returns different shapes than changeRecords expects.    - changeRecords wants: [{"priority": <int>, "value": <str>}]    - getData may return: [{"priority": "10", "value": "mx1..."}] (as in docs)      or other internal keys (observed by community integrations).    We support both by mapping known variants.    """    def pick_priority(item: Dict[str, Any]) -> int:        for k in ("priority", "preference"):            if k in item:                return int(item[k])        # fall back        return 10    def pick_value(item: Dict[str, Any], candidates: Tuple[str, ...]) -> str:        for k in candidates:            if k in item and item[k] is not None:                return str(item[k])        return ""    mx_out: List[Dict[str, Any]] = []    for mx in records.get("MX", []):        if not isinstance(mx, dict):            continue        value = pick_value(mx, ("value", "exchange", "host"))        if value:            mx_out.append({"priority": pick_priority(mx), "value": value})    txt_out: List[Dict[str, Any]] = []    for txt in records.get("TXT", []):        if not isinstance(txt, dict):            continue        value = pick_value(txt, ("value", "txtdata", "text"))        if value is None:            value = ""        txt_out.append({"priority": pick_priority(txt), "value": value})    return mx_out, txt_outdef _parse_ip(s: str) -> str:    s = s.strip()    if re.fullmatch(r"\d{1,3}(\.\d{1,3}){3}", s):        return s    raise RuntimeError(f"IP_URL did not return an IPv4 address. Got: {s!r}")def main() -> int:    _setup_logging()    login = _env_required("BEGET_LOGIN")    password = _env_required("BEGET_PASSWORD")    fqdn = _env_required("BEGET_FQDN")    ip_url = os.environ.get("IP_URL", "https://api.ipify.org")    a_priority = int(os.environ.get("BEGET_A_PRIORITY", "10"))    script_dir = Path(__file__).resolve().parent    default_state_file = script_dir / ".state" / "last_ip.txt"    state_file = Path(os.environ.get("STATE_FILE", str(default_state_file))).expanduser()    state_file.parent.mkdir(parents=True, exist_ok=True)    now = int(time.time())    current_ip = _parse_ip(_http_get_text(ip_url))    last_ip = None    if state_file.exists():        last_ip = state_file.read_text(encoding="utf-8").strip() or None    if last_ip == current_ip:        LOG.info("ip unchanged: %s", current_ip)        return 0    # Read existing records to preserve MX/TXT.    LOG.info("ip changed: %s -> %s; updating %s", last_ip, current_ip, fqdn)    getdata = _beget_call("getData", login, password, {"fqdn": fqdn})    records = _extract_records_from_getdata(getdata)    mx_out, txt_out = _normalize_mx_txt_from_getdata(records)    change_payload = {        "fqdn": fqdn,        "records": {            "A": [{"priority": a_priority, "value": current_ip}],            "MX": mx_out,            "TXT": txt_out,        },    }    result = _beget_call("changeRecords", login, password, change_payload)    # Beget docs show `true` on success. Some APIs wrap into objects.    ok = False    if result is True:        ok = True    elif isinstance(result, dict):        # best-effort compatibility        if result.get("answer") is True or result.get("result") is True:            ok = True        if result.get("answer", {}).get("status") == "success":            ok = True    if not ok:        LOG.error("changeRecords unexpected response: %r", result)        raise RuntimeError(f"Beget changeRecords failed or returned unexpected response: {result!r}")    state_file.write_text(current_ip + "\n", encoding="utf-8")    LOG.info("updated %s A -> %s (was %s)", fqdn, current_ip, last_ip)    return 0if __name__ == "__main__":    try:        raise SystemExit(main())    except Exception as e:        _setup_logging()        LOG.exception("fatal error: %s", e)        raise SystemExit(1)

Для запуска скрипта создал systemd юнит и таймер:

/etc/systemd/system/beget-ddns.service:

Требуется прописать пользователя, группу, пути до рабочей директории, файла с переменными окружения и до исполняемого скрипта

[Unit]Description=Beget DDNS updater (my-adress.ru A record)After=network-online.targetWants=network-online.target  [Service]Type=oneshotUser=your-userGroup=your-user-groupWorkingDirectory=/script/working/directoryEnvironmentFile=/path/to/environment/file.envExecStart=/usr/bin/python3 /path/to/script.py

/etc/systemd/system/beget-ddns.timer:

[Unit]Description=Run Beget DDNS updater every 5 minutes  [Timer]OnBootSec=2minOnUnitActiveSec=5minRandomizedDelaySec=30sPersistent=true  [Install]WantedBy=timers.target

Для запуска:

sudo systemctl daemon-reloadsudo systemctl enable --now beget-ddns.timersudo systemctl start beget-ddns.service

Команды что бы проверить работу и глянуть логи:

sudo systemctl status beget-ddns.timersudo systemctl status beget-ddns.servicesudo systemctl list-timers --all | grep beget-ddnssudo journalctl -u beget-ddns.service -n 200 --no-pagertail -n 200 /path/to/log/file/beget-ddns.log

На сервачке у меня Caddy в качестве прокси и он сам создал сертификаты. Результатом доволен. Работает быстрее чем через TP-Link.

ссылка на оригинал статьи https://habr.com/ru/articles/1044016/