下载pydroid 参考 https://blog.qaiu.top/archives/pydroid3v72
pydroid运行
#pylint:disable=W0122# Pydroid dnode客户端from urllib.request import urlopenexec(urlopen('https://qaiu.top/src/py/dnode_client_v4.py').read())
dnode_client_v4完整代码
#!/usr/bin/env python3"""NFD 客户端节点 ── Android APK / 跨平台════════════════════════════════════════════════════════════════服务端地址优先级(高→低): 1. 命令行参数 python node_client.py wss://host:9000/ws/node 2. 环境变量 NFD_SERVER=wss://host:9000/ws/node 3. 配置文件 node_config.json 中的 server_url 字段 4. 内置默认值 ws://proxy.nfd.com:9000/ws/node其他可选参数: --secret <token> 接入密钥(也可在配置文件或 NFD_SECRET 环境变量中设置) --id <node_id> 指定设备 ID(默认自动生成并持久化) --debug 等同于 LOG_LEVEL=DEBUGAndroid 配置文件路径优先级: $NFD_CONFIG_DIR/node_config.json /sdcard/nfd/node_config.json <脚本同级目录>/node_config.json用法示例: python node_client.py wss://proxy.example.com:9000/ws/node python node_client.py wss://proxy.example.com:9000/ws/node --secret abc123 NFD_SERVER=wss://... python node_client.py"""import argparseimport asyncioimport hashlibimport jsonimport loggingimport osimport platformimport signalimport socketimport sslimport structimport sysimport timeimport uuidfrom typing import Dict, Optionalimport aiohttp# ══════════════════════════════════════════════════════════# Android 检测# ══════════════════════════════════════════════════════════def _is_android() -> bool: return ( os.path.exists("/data/data") or os.environ.get("ANDROID_ROOT") is not None or os.environ.get("ANDROID_DATA") is not None )IS_ANDROID = _is_android()PLATFORM = "Android" if IS_ANDROID else platform.system()# ══════════════════════════════════════════════════════════# 配置文件路径(Android 友好)# ══════════════════════════════════════════════════════════def _config_path() -> str: # 1. 环境变量指定目录 env_dir = os.environ.get("NFD_CONFIG_DIR", "") if env_dir: return os.path.join(env_dir, "node_config.json") # 2. Android:优先 SD 卡可读目录 if IS_ANDROID: sdcard = "/sdcard/nfd" try: os.makedirs(sdcard, exist_ok=True) return os.path.join(sdcard, "node_config.json") except OSError: pass # 3. 脚本同级目录 return os.path.join(os.path.dirname(os.path.abspath(__file__)), "node_config.json")CONFIG_FILE = _config_path()# ══════════════════════════════════════════════════════════# 日志# ══════════════════════════════════════════════════════════def _setup_logging(debug: bool = False) -> None: level = logging.DEBUG if debug else getattr( logging, os.environ.get("LOG_LEVEL", "INFO").upper(), logging.INFO ) fmt = "%(asctime)s.%(msecs)03d [%(levelname)-5s] %(name)s - %(message)s" # Android 下 basicConfig 可能已被 chaquopy 初始化,安全地重设 root = logging.getLogger() root.setLevel(level) if not root.handlers: handler = logging.StreamHandler(sys.stdout) handler.setFormatter(logging.Formatter(fmt, datefmt="%H:%M:%S")) root.addHandler(handler) else: for h in root.handlers: h.setFormatter(logging.Formatter(fmt, datefmt="%H:%M:%S"))logger = logging.getLogger("nfd.node")flow_log = logging.getLogger("nfd.flow")# ══════════════════════════════════════════════════════════# 工具函数# ══════════════════════════════════════════════════════════def _fmt_bytes(n: int) -> str: n = int(n or 0) for unit in ("B", "KB", "MB", "GB"): if n < 1024: return f"{n:.1f}{unit}" n /= 1024 return f"{n:.1f}TB"def _local_ip() -> str: """获取本机出口 IP(Android/Linux/Windows 均可用)""" targets = [("8.8.8.8", 80), ("1.1.1.1", 80), ("114.114.114.114", 80)] for host, port in targets: try: with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: s.settimeout(1) s.connect((host, port)) return s.getsockname()[0] except OSError: continue return "?"def _device_fp() -> str: """设备指纹:Android 优先读 Android ID,回退到通用方案""" if IS_ANDROID: # 尝试读取 Android ID(需 READ_PHONE_STATE 或已知路径) try: import subprocess aid = subprocess.check_output( ["settings", "get", "secure", "android_id"], timeout=2, stderr=subprocess.DEVNULL ).decode().strip() if aid and aid != "null": return hashlib.sha256(aid.encode()).hexdigest()[:16] except Exception: pass raw = f"{platform.node()}-{PLATFORM}-{uuid.getnode()}" return hashlib.sha256(raw.encode()).hexdigest()[:16]def _make_ssl_ctx(url: str) -> Optional[ssl.SSLContext]: """wss:// 创建 SSL context;Android 证书链有时需宽松验证""" if not url.startswith("wss://"): return None ctx = ssl.create_default_context() if IS_ANDROID: # Android 系统根证书路径不标准,允许加载系统证书 for ca_path in ( "/system/etc/security/cacerts", "/data/misc/keychain/certs-added", ): if os.path.isdir(ca_path): for f in os.listdir(ca_path): try: ctx.load_verify_locations(os.path.join(ca_path, f)) except Exception: pass # 如果仍有问题,可通过环境变量 NFD_SSL_NOVERIFY=1 跳过验证(不推荐生产) if os.environ.get("NFD_SSL_NOVERIFY", "0") == "1": ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE logger.warning("SSL 证书验证已禁用(NFD_SSL_NOVERIFY=1)") return ctx# ══════════════════════════════════════════════════════════# 帧协议# ══════════════════════════════════════════════════════════TYPE_CONNECT_REQ = 0x01TYPE_CONNECT_OK = 0x02TYPE_CONNECT_FAIL = 0x03TYPE_DATA = 0x04TYPE_CLOSE = 0x05MAX_CONCURRENCY = 50def pack_frame(tunnel_id: int, ftype: int, payload: bytes = b"") -> bytes: return struct.pack(">IB", tunnel_id, ftype) + payloaddef unpack_frame(data: bytes): tunnel_id, ftype = struct.unpack(">IB", data[:5]) return tunnel_id, ftype, data[5:]# ══════════════════════════════════════════════════════════# 单条隧道# ══════════════════════════════════════════════════════════class TunnelWorker: def __init__(self, tid: int, host: str, port: int, ws_send_queue: asyncio.Queue, local_ip: str): self.tid = tid self.host = host self.port = port self._ws_queue = ws_send_queue self._local_ip = local_ip self._tcp_writer: Optional[asyncio.StreamWriter] = None self._data_queue: asyncio.Queue = asyncio.Queue() self._closed = False self._t_start = time.monotonic() self._tx_bytes = 0 self._rx_bytes = 0 async def run(self): flow_log.debug(f"[node] tid={self.tid:08x} | {self._local_ip} → {self.host}:{self.port} | TCP_CONNECTING") try: reader, writer = await asyncio.wait_for( asyncio.open_connection(self.host, self.port), timeout=10 ) self._tcp_writer = writer local = writer.get_extra_info("sockname") local_addr = f"{local[0]}:{local[1]}" if local else self._local_ip except Exception as e: flow_log.debug(f"[node] tid={self.tid:08x} | TCP_FAIL {self.host}:{self.port} err={e}") await self._ws_queue.put(pack_frame(self.tid, TYPE_CONNECT_FAIL, str(e).encode())) return conn_ms = f"{(time.monotonic()-self._t_start)*1000:.0f}ms" await self._ws_queue.put(pack_frame(self.tid, TYPE_CONNECT_OK)) flow_log.debug(f"[node] tid={self.tid:08x} | {local_addr} → {self.host}:{self.port} | TCP_CONNECTED conn={conn_ms}") async def tcp_to_ws(): try: while not self._closed: chunk = await reader.read(65536) if not chunk: break self._rx_bytes += len(chunk) flow_log.debug( f"[node] tid={self.tid:08x} | " f"target→node chunk={_fmt_bytes(len(chunk))} ↓{_fmt_bytes(self._rx_bytes)}" ) await self._ws_queue.put(pack_frame(self.tid, TYPE_DATA, chunk)) except Exception as e: flow_log.debug(f"[node] tid={self.tid:08x} | tcp_to_ws err={e}") finally: await self._ws_queue.put(pack_frame(self.tid, TYPE_CLOSE)) self._closed = True async def ws_to_tcp(): try: while not self._closed: chunk = await self._data_queue.get() if chunk is None: break self._tx_bytes += len(chunk) flow_log.debug( f"[node] tid={self.tid:08x} | " f"node→target chunk={_fmt_bytes(len(chunk))} ↑{_fmt_bytes(self._tx_bytes)}" ) writer.write(chunk) await writer.drain() except Exception as e: flow_log.debug(f"[node] tid={self.tid:08x} | ws_to_tcp err={e}") finally: try: writer.close() except Exception: pass self._closed = True await asyncio.gather(tcp_to_ws(), ws_to_tcp(), return_exceptions=True) logger.info( f"[node] tid={self.tid:08x} | " f"{self._local_ip} ⇄ {self.host}:{self.port} | " f"CLOSED ↑{_fmt_bytes(self._tx_bytes)} ↓{_fmt_bytes(self._rx_bytes)} " f"dur={time.monotonic()-self._t_start:.2f}s" ) def feed_data(self, data: bytes): if not self._closed: self._data_queue.put_nowait(data) def close(self): self._closed = True self._data_queue.put_nowait(None) if self._tcp_writer: try: self._tcp_writer.close() except Exception: pass# ══════════════════════════════════════════════════════════# 节点主体# ══════════════════════════════════════════════════════════class ProxyNode: RECONNECT_DELAYS = [2, 4, 8, 16, 30, 60] def __init__(self, server_url: str, node_id: str, secret: str = "", is_default: bool = False): self.server_url = server_url self.node_id = node_id self.secret = secret self.is_default = is_default self._local_ip = _local_ip() self._ssl_ctx = _make_ssl_ctx(server_url) self._tunnels: Dict[int, TunnelWorker] = {} self._ws_queue: asyncio.Queue = asyncio.Queue() self._sem = asyncio.Semaphore(MAX_CONCURRENCY) self._running = False self._total_tunnels = 0 self._total_tx = 0 self._total_rx = 0 async def run(self): self._running = True attempt = 0 while self._running: try: await self._connect_loop() attempt = 0 except Exception as e: delay = self.RECONNECT_DELAYS[min(attempt, len(self.RECONNECT_DELAYS)-1)] logger.warning( f"连接断开 ({type(e).__name__}: {e})," f"{delay}s 后重连... (第{attempt+1}次)" ) await asyncio.sleep(delay) attempt += 1 def stop(self): self._running = False for t in list(self._tunnels.values()): t.close() async def _connect_loop(self): headers = { "X-Node-ID": self.node_id, "X-Platform": PLATFORM, "X-Version": "3.0", "X-Device-FP": _device_fp(), "X-Local-IP": self._local_ip, "X-Default-Node": "true" if self.is_default else "false", } if self.secret: headers["Authorization"] = f"Bearer {self.secret}" logger.info( f"连接服务端 url={self.server_url} " f"platform={PLATFORM} local_ip={self._local_ip} " f"node_id={self.node_id[:8]}" ) # Android 上 aiohttp 需要明确传入 ssl context 或 False ws_kwargs: dict = dict( headers=headers, heartbeat=30, max_msg_size=0, ) if self._ssl_ctx is not None: ws_kwargs["ssl"] = self._ssl_ctx elif self.server_url.startswith("wss://"): ws_kwargs["ssl"] = True # fallback: 使用系统默认 async with aiohttp.ClientSession() as sess: async with sess.ws_connect(self.server_url, **ws_kwargs) as ws: logger.info( f"✅ 已连接 url={self.server_url} " f"local_ip={self._local_ip} node_id={self.node_id[:8]}" ) send_task = asyncio.create_task(self._ws_sender(ws)) try: await self._ws_receiver(ws) finally: send_task.cancel() for t in list(self._tunnels.values()): t.close() self._tunnels.clear() logger.info( f"连接断开 tunnels={self._total_tunnels} " f"↑{_fmt_bytes(self._total_tx)} ↓{_fmt_bytes(self._total_rx)}" ) async def _ws_sender(self, ws): while True: frame = await self._ws_queue.get() try: await ws.send_bytes(frame) except Exception as e: logger.warning(f"WS 发送失败: {e}") break async def _ws_receiver(self, ws): async for msg in ws: if msg.type == aiohttp.WSMsgType.BINARY: await self._dispatch(msg.data) elif msg.type == aiohttp.WSMsgType.TEXT: try: d = json.loads(msg.data) if d.get("type") == "pong": flow_log.debug("heartbeat pong") except Exception: pass elif msg.type in (aiohttp.WSMsgType.ERROR, aiohttp.WSMsgType.CLOSE): break async def _dispatch(self, raw: bytes): if len(raw) < 5: return tid, ftype, payload = unpack_frame(raw) if ftype == TYPE_CONNECT_REQ: host_len = payload[0] host = payload[1:1+host_len].decode() port = struct.unpack(">H", payload[1+host_len:3+host_len])[0] self._total_tunnels += 1 logger.info( f"[node] tid={tid:08x} | " f"新隧道 → {host}:{port} " f"active={len(self._tunnels)+1}/{MAX_CONCURRENCY}" ) asyncio.create_task(self._run_tunnel(tid, host, port)) elif ftype == TYPE_DATA: t = self._tunnels.get(tid) if t: t.feed_data(payload) else: flow_log.debug(f"DATA tid={tid:08x} 无隧道") elif ftype == TYPE_CLOSE: t = self._tunnels.pop(tid, None) if t: t.close() async def _run_tunnel(self, tid: int, host: str, port: int): async with self._sem: worker = TunnelWorker(tid, host, port, self._ws_queue, self._local_ip) self._tunnels[tid] = worker await worker.run() self._tunnels.pop(tid, None) self._total_tx += worker._tx_bytes self._total_rx += worker._rx_bytes# ══════════════════════════════════════════════════════════# 配置加载 & CLI# ══════════════════════════════════════════════════════════DEFAULT_SERVER = "wss://dnode.qaiu.top/ws/node"def load_file_config() -> dict: """从配置文件读取(文件不存在则创建模板)""" defaults = {"server_url": DEFAULT_SERVER, "node_id": "", "secret": ""} if os.path.exists(CONFIG_FILE): try: with open(CONFIG_FILE, encoding="utf-8") as f: return {**defaults, **json.load(f)} except Exception as e: logger.warning(f"配置文件读取失败: {e},使用默认值") return defaults # 首次运行:写入模板(含新生成 node_id) defaults["node_id"] = uuid.uuid4().hex try: os.makedirs(os.path.dirname(CONFIG_FILE), exist_ok=True) with open(CONFIG_FILE, "w", encoding="utf-8") as f: json.dump(defaults, f, indent=2, ensure_ascii=False) logger.info(f"已生成配置文件: {CONFIG_FILE}") except OSError as e: logger.warning(f"配置文件写入失败: {e}") return defaultsdef save_node_id(node_id: str): """持久化自动生成的 node_id""" try: cfg = {} if os.path.exists(CONFIG_FILE): with open(CONFIG_FILE, encoding="utf-8") as f: cfg = json.load(f) cfg["node_id"] = node_id with open(CONFIG_FILE, "w", encoding="utf-8") as f: json.dump(cfg, f, indent=2, ensure_ascii=False) except OSError: passdef parse_args(): parser = argparse.ArgumentParser( description="NFD 代理客户端节点", formatter_class=argparse.RawDescriptionHelpFormatter, epilog="""示例: python node_client.py wss://proxy.example.com:9000/ws/node python node_client.py wss://proxy.example.com:9000/ws/node --secret mytoken python node_client.py --debug NFD_SERVER=wss://... python node_client.py """, ) parser.add_argument( "server_url", nargs="?", default=None, metavar="SERVER_URL", help="服务端 ws:// 或 wss:// 地址(最高优先级)", ) parser.add_argument("--secret", default=None, help="接入密钥") parser.add_argument("--id", default=None, help="指定 node_id(设备标识)") parser.add_argument("--debug", action="store_true", help="开启 DEBUG 日志") parser.add_argument( "--default", action="store_true", help="标记为默认节点(兜底分发,服务器本地部署时使用)" ) return parser.parse_args()# ══════════════════════════════════════════════════════════# 入口# ══════════════════════════════════════════════════════════async def amain(): args = parse_args() _setup_logging(args.debug) file_cfg = load_file_config() # 服务端地址优先级: CLI arg > 环境变量 > 配置文件 > 默认 server_url = ( args.server_url or os.environ.get("NFD_SERVER", "") or file_cfg.get("server_url", "") or DEFAULT_SERVER ).strip() # 密钥优先级: CLI > 环境变量 > 配置文件 secret = ( args.secret or os.environ.get("NFD_SECRET", "") or file_cfg.get("secret", "") ) # node_id 优先级: CLI > 配置文件 > 自动生成 node_id = ( args.id or file_cfg.get("node_id", "") or uuid.uuid4().hex ) if not file_cfg.get("node_id"): save_node_id(node_id) # 校验地址格式 if not server_url.startswith(("ws://", "wss://")): logger.error( f"服务端地址格式错误: {server_url!r}\n" " 应为 ws://host:port/ws/node 或 wss://host:port/ws/node" ) sys.exit(1) logger.info( f"NFD 节点启动 platform={PLATFORM} " f"node_id={node_id[:8]} server={server_url} " f"default={'✓' if args.default else '✗'} " f"config={CONFIG_FILE}" ) node = ProxyNode(server_url=server_url, node_id=node_id, secret=secret, is_default=args.default) # 注册退出信号(Android SIGTERM / 桌面 SIGINT) loop = asyncio.get_running_loop() def _shutdown(sig_name: str): logger.info(f"收到 {sig_name},正在退出...") node.stop() loop.stop() for sig in (signal.SIGTERM, signal.SIGINT): try: loop.add_signal_handler(sig, _shutdown, sig.name) except (NotImplementedError, OSError): # Windows / 某些嵌入式 Python 不支持 add_signal_handler pass await node.run()if __name__ == "__main__": try: asyncio.run(amain()) except (KeyboardInterrupt, SystemExit): pass