通过dnsmasq 增加静态解析实现
import base64
import json
import os
import time
import syslog
import requests
# 全局变量上次服务器IP
last_server_ip = []
# 解析Shadowsocks链接函数
def parse_ss_link(link):
ss_encoded_str = link[5:].split('#')[0]
ss_encoded_server = link[5:].split('#')[1].split('@')[1].split(':')[0]
ss_decoded_bytes = base64.b64decode(ss_encoded_str + '==')
ss_decoded_str = ss_decoded_bytes.decode('utf-8')
ss_server_ip = ss_decoded_str.split('@')[1].split(':')[0]
ss_server_name = ss_decoded_str.split('.')[0]
return ss_server_name, ss_encoded_server, ss_server_ip
# 解析vmess链接函数,反回域名和ip地址
def parse_vmess_link(link):
vmess_encoded_str = link[8:]
vmess_encoded_str_padded = vmess_encoded_str + '=='
vmess_decoded_bytes = base64.urlsafe_b64decode(vmess_encoded_str_padded)
vmess_decoded_str = vmess_decoded_bytes.decode('utf-8')
vmess_server_ip = json.loads(vmess_decoded_str)['add']
vmess_server_domain = json.loads(vmess_decoded_str)['ps'].split('@')[1].split(':')[0]
vmess_server_name = vmess_server_domain.split('.')[0]
return vmess_server_name, vmess_server_domain, vmess_server_ip
def get_jms_config():
url = 'You subscription url'
response = requests.get(url)
if response.status_code == 200:
decoded_content = base64.b64decode(
response.content.strip()).decode('utf_8')
vmess_links = decoded_content.strip().split('\n')
return vmess_links
else:
return None
def get_jms_config_test():
decoded_content = base64.b64decode(
''.strip()).decode(
'utf_8')
vmess_links = decoded_content.strip().split('\n')
return vmess_links
def run():
global last_server_ip
dnsmasq_entry_array = []
server_list = []
try:
syslog.openlog('jms-dns-update', syslog.LOG_PID, syslog.LOG_USER)
vmess_links = get_jms_config()
except Exception as e:
syslog.syslog(f"Failed to get JMS config: {e}")
return
# 遍历vmess链接
for link in vmess_links:
domain_ip = None
if link.startswith('ss://'):
domain_ip = parse_ss_link(link)
elif link.startswith('vmess://'):
domain_ip = parse_vmess_link(link)
if domain_ip is not None:
server_list.append(domain_ip)
dnsmasq_entry = f"address=/{domain_ip[1]}/{domain_ip[2]}"
dnsmasq_entry_array.append(dnsmasq_entry)
# 检查是否有变化深度拷贝dnsmasq_entry_array 与 last_server_ip 比较
if server_list == last_server_ip:
syslog.syslog("No changes in server IP, skipping...")
elif dnsmasq_entry_array != last_server_ip:
syslog.syslog("Server IP has changed, updating...")
last_server_ip = server_list.copy()
# 配置文件路径
dnsmasq_conf_file = '/etc/dnsmasq.d/jms.host.conf'
# 写入配置文件
with open(dnsmasq_conf_file, 'w') as f:
for entry in dnsmasq_entry_array:
f.write(entry + '\n')
# syslog输出dnsmasq_entry_array元素
for i in dnsmasq_entry_array:
syslog.syslog(f"Added to dnsmasq: {i}")
# 重启dnsmasq
syslog.syslog("Restarting dnsmasq...")
restart_dnsmasq = 'systemctl restart dnsmasq'
os.system(restart_dnsmasq)
syslog.syslog("Done!")
syslog.syslog("waiting for 10 minutes")
# 每分钟执行一次run函数
while 1 == 1:
run()
time.sleep(60 * 10)