DDNS 动态域名系统原理浅析

40

DDNS(Dynamic Domain Name System)在需要远程访问家庭网络设备或搭建服务器等场景中非常有用。今天我们来讨论一下 DDNS 的实现过程。我最近将域名 DNS 解析迁移到了火山引擎 TrafficRoute DNS 套件(以下简称TRDNS),发现市面上没有针对 TRDNS 的动态域名解析工具,因此我决定自己动手实现一个,并在这里记录下这个过程。

必备材料

  • 购买一个域名(可在阿里云、腾讯云、华为云等服务商购买,注意长期在国内使用需进行工信部ICP备案和网安备案)

  • 使用DNS解析API(以下简称【设置接口】,由DNS服务商提供,默认情况下在购买域名的服务商提供。在购买域名之前,确保服务器提供此服务,大多数服务商都有)

  • 家庭宽带具有动态IP(没有动态IP无法使用DDNS,IPv4 和 IPv6 的 DDNS 解析过程都是一样的,除了服务商解析API可能会有区别,下文我依IPv4为例)

  • 熟练掌握一门编程语言(本文使用Python和Groovy(Java),读者可自行选择其他编程语言)

  • 使用免费IP信息查询API接口(以下简称【查询接口】,可在此GitHub仓库中查找:https://github.com/ihmily/ip-info-api)

DDNS 实现原理

我直接用通俗易懂的语言来解释整个代码的实现过程,相信看到这里的同学都能够听得明白。

我们会使用定时任务不断地自动查询接口,获取当前的公网IP地址,并将其记录在本地。每次查询后,我们会将新的IP地址与上一次保存的IP地址进行比对,如果不一致,就说明IP发生了变化,这时候我们需要通过DNS解析API将新的IP地址解析到对应的域名上。

整个DDNS的运行过程其实非常简单,我们常用的DDNS客户端ddns-go和ALIDDNS都是基于这个原理实现的。

下文将根据以下几点来说明实现过程:

  1. 获取当前的公网IP地址

  2. 使用定时任务

  3. 判断IP是否发生变化

  4. 将新的IP地址解析到域名上

获取当前的公网 IP 地址

大多数查询接口都是使用 GET 请求。为了确定接口返回的是公网 IPv4 还是 IPv6,我们可以直接在浏览器中打开接口并查看返回的 IP。但是,有些特殊情况,比如 https://ipinfo.io,使用浏览器打开是可视化的网页,使用接口方式访问直接返回 IPv4 地址。因此,最好使用编程发起请求。

以下是一个 Python 实例,用于获取当前公网 IP 地址:

import requests

def get_pub_ip():
    try:
        response = requests.get('https://ipinfo.io', timeout=2)
        response.raise_for_status()
        return response.json().get('ip')
    except Exception as e:
        return None

这样我们就成功获取了公网 IP 地址。为了确保稳定性,建议在实际使用中从多个查询接口中获取公网 IP,以免受某个查询接口临时故障的影响。

使用定时任务

定时任务的主要目的是为了定时循环监控公网IP的变化。一般来说,我们会设置时间间隔,建议以分钟为单位进行设置。

实现定时任务的方式有很多种,可以使用编程框架提供的定时任务。但一般情况下,这些定时任务需要持续在后台运行。个人认为即用即走的方案更为便捷,例如Linux系统自带的crontab或Windows系统自带的任务计划程序等。我的DDNS实现只有一个Python脚本,正好也是在Linux系统上,因此我选择使用crontab。大家可以根据自己的情况选择适合自己的方式进行设置。

# 每三分钟检查一次IP变化
*/3 * * * * sudo -u root /usr/bin/python3 /ddns/ddns.py

判断 IP 是否发生变化

需要将查询到的公网IP保存起来,最简单的方式是将其存储在本地文件中。下次执行任务时,可以将查询到的IP与之前保存的IP进行比对,以判断是否发生了变化。这个步骤非常简单:

import requests
import sys

def get_pub_ip():
    try:
        response = requests.get('https://ipinfo.io', timeout=2)
        response.raise_for_status()
        return response.json().get('ip')
    except Exception as e:
        return None

# IPV4地址保存文件
ipv4_file = '/ddns/ipv4.txt'

# 读取IPV4文件
def read_ipv4_file():
    try:
        with open(ipv4_file, 'r') as file:
            return file.read()
    except FileNotFoundError:
        return None

# 写入IPV4文件
def save_ipv4_file(ip):
    try:
        with open(ipv4_file, 'w') as file:
            file.write(ip)
    except Exception as e:
        print(f"写入文件时发生错误: {e}")

if __name__ == '__main__':
    # 获取当前IP
    ipv4 = get_pub_ip()
    # 获取旧IP
    old_ipv4 = read_ipv4_file()
    # 比对IP是否变更
    if old_ipv4 == ipv4:
        sys.exit()
    # 保存变更后的IP
    save_ipv4_file(ipv4)

将新的IP地址解析到域名上

不同DNS服务商的设置接口可能不同,因此我的示例代码可能无法适用于所有人。然而,基本思路是一致的,需要大家灵活运用。我们需要查阅服务商的云解析DNS文档,了解其API列表和调用方式。需要关注的API包括:获取域名信息、添加解析记录、删除解析记录和更新解析记录。按照我的步骤,只需关注这几项API。当然,更推荐通读整个API文档,以获取更全面有效的信息。

TrafficRoute DNS 套件文档地址:https://www.volcengine.com/docs/6758/155086

我使用 Groovy 脚本编写了这部分代码,因为我对 Java 更熟悉。由于需要与上面的 Python 代码结合使用,而且我习惯即用即走的设计方式,因此这部分代码被设计成可独立使用的部分。Python 通过命令行 subprocess 的方式进行调用与结合。

Groovy 部分代码:

import groovy.json.JsonOutput
import groovy.json.JsonSlurper

import javax.crypto.Mac
import javax.crypto.spec.SecretKeySpec
import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.security.MessageDigest
import java.time.ZoneId
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import java.util.logging.Level
import java.util.logging.Logger

import jp.co.osstech.regdom4j.RegDomain

//校验参数数量
if(args.length < 2) {
  Logger.global.log(Level.SEVERE, "Usage: script.groovy <domain_name> <new_ip>")
  System.exit(0)
}

//获取参数
def DOMAIN_NAME = args[0]
def DOMAIN = new RegDomain().getRegisteredDomain(DOMAIN_NAME);
def HOST = DOMAIN_NAME.substring(0, DOMAIN_NAME.lastIndexOf(DOMAIN)-1)
def NEW_IP = args[1]

//获取域名记录ZID
def zid = getZID(DOMAIN);
//获取域名解析记录
def recordId = getRecordID(zid, DOMAIN_NAME);
if(recordId==null || recordId.isBlank()) {
  //创建IP解析
  createDNS(zid, NEW_IP, HOST);
}else{
  //更新IP解析
  updateDNS(recordId, NEW_IP, HOST);
}

//=======================================================

static void updateDNS(String recordId, String ip, String host) throws URISyntaxException, IOException, InterruptedException {
  def body = JsonOutput.toJson([
    Host: host,
    Line: "default",
    RecordID: recordId,
    TTL: 600,
    Type: "A",
    Value: ip,
    Weight: 1
  ]).toString().bytes
  var result = request("POST", [:], [:], "UpdateRecord", body)
  if(result.RecordID == recordId){
    Logger.global.log(Level.WARNING, "Update DNS successfully")
  }else{
    Logger.global.log(Level.SEVERE, "Update DNS failed")
  }
}

static void createDNS(Long zid, String ip, String host) {
  def body = JsonOutput.toJson([
    Host: host,
    Line: "default",
    TTL: 600,
    Type: "A",
    Value: ip,
    ZID: zid
  ]).bytes
  var result = request("POST", [:], [:], "CreateRecord", body);
  if(result.Host == host){
    Logger.global.log(Level.WARNING, "Create DNS successfully");
  }else{
    Logger.global.log(Level.WARNING, "Create DNS failed");
  }
}

static String getRecordID(Long zid, String DOMAIN_NAME) {
  var query = [ZID: zid, PageSize: 500];
  var result = request("GET", query, [:], "ListRecords",[] as byte[]);
  def record = result.Records?.find { r -> DOMAIN_NAME == (r as Map).PQDN } as Map
  return record ? record.RecordID.toString() : null
}

static Long getZID(String domain) {
  def query = [PageSize: 500];
  def result = request("GET", query, [:], "ListZones", [] as byte[])
  def zone = result.Zones?.find { z -> domain == (z as Map).ZoneName } as Map
  return zone?.ZID as Long;
}

//=====================================================================

/**
 * sha256非对称加密
 */
static byte[] hmacSHA256(byte[] key, String content) {
  Mac mac = Mac.getInstance("HmacSHA256");
  mac.init(new SecretKeySpec(key, "HmacSHA256"));
  return mac.doFinal(content.getBytes());
}

/**
 * sha256 hash算法
 */
static String hashSHA256(byte[] content) {
  MessageDigest md = MessageDigest.getInstance("SHA-256");
  return encodeHexStr(md.digest(content));
}

/**
 * 将字节数组转换为十六进制字符串
 * @param byteArray 字节数组
 * @return 十六进制字符串
 */
static String encodeHexStr(byte[] byteArray) {
  StringBuilder hexString = new StringBuilder();
  for (byte b : byteArray) {
    hexString.append(String.format("%02x", b));
  }
  return hexString.toString();
}

/**
 * 发送请求
 */
static Map<?, ?> request(String method, Map<String, Object> query, Map<String, String> header, String action, byte[] body) {
  def contentType = "application/json"
  def host = "open.volcengineapi.com"
  def path = "/"
  // 初始化身份证明
  var credential = [
    "accessKeyId": "AK",
    "secretKeyId": "SK",
    "service": "DNS",
    "region": "cn-north-1"
  ]

  //计算签名
  def queryList = new ArrayList<>(query.entrySet())
  queryList.sort { it.key }
  def pairs = queryList.collect { "${it.key}=${it.value}" } as List<String>
  pairs.add("Action=${action}")
  pairs.add("Version=2018-08-01")
  //按参数名称对查询参数进行升序排序
  pairs.sort { it.split('=')[0] }
  // 初始化签名结构
  var uri = new URI("https", host, path, String.join("&", pairs), null)
  // 接下来开始计算签名
  var formatter = DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmss'Z'").withZone(ZoneId.of("UTC"))
  var zonedDateTime = ZonedDateTime.now(ZoneId.of("UTC"))
  var xDate = formatter.format(zonedDateTime)
  var shortXDate = xDate.substring(0, 8)
  var xContentSha256 = hashSHA256(body)
  //计算
  def headStr = ["content-type", "host", "x-content-sha256", "x-date"]
  var signedHeadersStr = headStr.join(';')
  def headStrSecond = ["content-type:$contentType", "host:$host", "x-content-sha256:$xContentSha256", "x-date:$xDate"]
  var preRequestStr = headStrSecond.join('\n')
  var preCanonicalRequestStr = [method, path, uri.getRawQuery(), preRequestStr, "", signedHeadersStr, xContentSha256]
  var canonicalRequestStr = preCanonicalRequestStr.join('\n')
  var hashedCanonicalRequest = hashSHA256(canonicalRequestStr.getBytes())
  var credentialStr = [shortXDate, credential.get("region"), credential.get("service"), "request"]
  var credentialScope = credentialStr.join("/")
  var preStringToSign = ["HMAC-SHA256", xDate, credentialScope, hashedCanonicalRequest]
  var stringToSign = preStringToSign.join("\n")
  var kDate = hmacSHA256(credential.get("secretKeyId").getBytes(), shortXDate)
  var kRegion = hmacSHA256(kDate, credential.get("region"))
  var kService = hmacSHA256(kRegion, credential.get("service"))
  var kSigning = hmacSHA256(kService, "request")
  var signature = encodeHexStr(Objects.requireNonNull(hmacSHA256(kSigning, stringToSign)))
  def authorization = "HMAC-SHA256 Credential=${credential.get("accessKeyId")}/${credentialScope}, SignedHeaders=${signedHeadersStr}, Signature=${signature}"
  //构建请求头
  var requestBuilder = HttpRequest.newBuilder().uri(uri)
          .header("Content-Type", contentType)
          .header("X-Date", xDate)
          .header("X-Content-Sha256", xContentSha256)
          .header("Authorization", authorization);
  header.forEach(requestBuilder::header);
  //构建请求
  var request = switch (method.toUpperCase()) {
    case "POST" -> requestBuilder.POST(HttpRequest.BodyPublishers.ofByteArray(body)).build()
    case "GET" -> requestBuilder.GET().build()
    default -> throw new UnsupportedOperationException()
  }
  //发送请求
  var response = HttpClient.newHttpClient().send(request, HttpResponse.BodyHandlers.ofString())
  //解析结果
  def result = new JsonSlurper().parseText(response.body()) as Map<String, ?>
  if(!result.containsKey("Result")){
    def error = (result.ResponseMetadata as Map).Error as Map
    //直接抛出错误
    throw new RuntimeException("$error.MessageCN.$error.Message")
  }
  return result.Result  as Map<String, ?>
}

Python部分代码:

# 更新DNS
result = subprocess.run(['groovy', '/ddns/script.groovy', 'www.xyz.cn', ipv4], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
    if result.returncode == 0:
        logging.info(f"更新DDNS成功:{ipv4}")
    else:
        error = result.stderr
        logging.error(f"更新DDNS失败:{ipv4} - {error}")
        if result.stdout:
            logging.error(f"命令标准输出: {result.stdout}")

结语

这样就实现了一个简单的DDNS客户端工具,整个步骤也比较简单,主要复杂在研究DNS服务商云解析API上。当然,以上步骤中的代码也有很多不合理的地方或者存在BUG,主要是为了说明流程。你可以根据自己的想法进行补充修改,比如增加当IP发生变化时通过微信发送通知等功能。

需要补充一点,定时任务的间隔直接影响了IP发生变化时更新DNS解析的速度,但也不是唯一的变量,因为DNS解析需要一定的生效时间。

感谢大家观看,如果不妥支持清谅解,有喜欢的部分可以进行讨论,我会考虑展开细说。