import os import sys import threading import requests import time import logging import subprocess import platform import json from urllib.parse import urlsplit, urlparse from pathlib import Path from colorama import init, Fore, Style from shutil import rmtree, move import importlib import queue import codecs import signal import traceback import webbrowser # 初始化 colorama init(autoreset=True) # 模块依赖 REQUIRED_MODULES = { "requests": "requests", "colorama": "colorama", "shutil": "shutil" # 确保 shutil 被明确列出 } # 配置文件路径 CONFIG_FILE = os.path.join(os.path.expanduser("~"), "pydownloader.conf") # 默认配置 DEFAULT_CONFIG = { "num_threads": 32, "download_dir": str(Path(os.environ.get("USERPROFILE", os.path.expanduser("~"))) / "Downloads"), "advanced_config": { "chunk_size": 8192, "timeout": 10, "retry_delay": 5, # 重试延迟时间(秒) "thread_sync": False # 线程同步,如果一个线程挂了,立即补上 }, "extra_config": { "max_retries": 3, # 最大重试次数 "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36" # User-Agent } } # 线程状态的枚举 class ThreadStatus: RUNNING = Fore.GREEN + "运行中" + Style.RESET_ALL ERROR = Fore.RED + "错误" + Style.RESET_ALL STOPPED = "已停止" # 检查模块安装并安装 def check_and_install_modules(): for module_name, package_name in REQUIRED_MODULES.items(): try: importlib.import_module(module_name) print(f"{Fore.GREEN}模块 '{module_name}' 已安装.{Style.RESET_ALL}") except ImportError: print(f"{Fore.RED}模块 '{module_name}' 未安装.{Style.RESET_ALL}") install = input(f"是否自动安装 '{module_name}'? (y/n): ").strip().lower() if install == 'y': try: subprocess.check_call([sys.executable, "-m", "pip", "install", package_name]) importlib.import_module(module_name) # 重新导入模块 print(f"{Fore.GREEN}模块 '{module_name}' 已成功安装.{Style.RESET_ALL}") except Exception as e: print(f"{Fore.RED}安装 '{module_name}' 失败: {e}{Style.RESET_ALL}") sys.exit(1) else: print(f"{Fore.RED}程序需要模块 '{module_name}',请手动安装后重试.{Style.RESET_ALL}") sys.exit(1) # 配置日志 def setup_logging(): timestamp = time.strftime("%Y%m%d_%H%M%S") log_dir = os.path.join(os.environ.get("TEMP", "/tmp"), "downloader") os.makedirs(log_dir, exist_ok=True) log_file_path = os.path.join(log_dir, f"{timestamp}.log") logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", filename=log_file_path, filemode="w", encoding='utf-8' ) return log_file_path def setup_full_log(file_name): timestamp = time.strftime("%Y%m%d_%H%M%S") log_dir = os.path.dirname(os.path.abspath(__file__)) # 当前脚本所在目录 full_log_file_path = os.path.join(log_dir, f"{timestamp}-{file_name}fulllog.log") logging.basicConfig( level=logging.DEBUG, # 记录所有级别的日志 format="%(asctime)s - %(levelname)s - %(message)s", filename=full_log_file_path, filemode="w", encoding='utf-8' # 指定编码 ) return full_log_file_path def get_default_download_path(): return DEFAULT_CONFIG["download_dir"] def validate_url(url): return url.startswith(('http://', 'https://')) def add_protocol_if_missing(url): if not url.startswith(('http://', 'https://')): return 'http://' + url return url def extract_hostname(url): try: return urlparse(url).netloc except: return None def check_server_availability(url, timeout=2): hostname = extract_hostname(url) if not hostname: return False, "Invalid URL format" try: if platform.system().lower() == "windows": result = subprocess.run( ["ping", "-n", "1", "-w", str(int(timeout * 1000)), hostname], capture_output=True, text=True, ) if "TTL=" in result.stdout: # 检查 TTL 字段,表示收到回复 return True, result.stdout else: return False, result.stderr else: result = subprocess.run( ["ping", "-c", "1", "-W", str(timeout), hostname], capture_output=True, text=True, ) if "1 received" in result.stdout: return True, result.stdout else: return False, result.stderr except FileNotFoundError: return False, "Ping 命令未找到,请检查您的系统配置。" except Exception as e: return False, str(e) def format_file_size(size_in_bytes): for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if size_in_bytes < 1024.0: return f"{size_in_bytes:.2f}{unit} ({size_in_bytes})" size_in_bytes /= 1024.0 return f"{size_in_bytes:.2f}PB ({size_in_bytes})" def get_file_info(url, timeout=DEFAULT_CONFIG["advanced_config"]["timeout"], is_single_thread=False): try: headers = {'User-Agent': DEFAULT_CONFIG["extra_config"]["user_agent"]} r = requests.head(url, allow_redirects=True, timeout=timeout, headers=headers) # 添加 User-Agent r.raise_for_status() if 'Content-Length' not in r.headers: raise Exception("该文件没有 Content-Length,可能无法分段下载") size = int(r.headers['Content-Length']) support_range = r.headers.get('Accept-Ranges', 'none') == 'bytes' if not is_single_thread else False # 单线程忽略 Range 支持 return size, support_range except requests.exceptions.RequestException as e: raise Exception(f"HTTP 请求错误: {e}") except Exception as e: raise Exception(f"获取文件信息失败: {e}") def check_disk_space(file_size, download_dir): try: if os.name == 'nt': import ctypes free_bytes = ctypes.c_ulonglong(0) ctypes.windll.kernel32.GetDiskFreeSpaceExW( ctypes.c_wchar_p(download_dir), None, None, ctypes.pointer(free_bytes) ) free_space = free_bytes.value else: stat = os.statvfs(download_dir) free_space = stat.f_frsize * stat.f_bavail if file_size > free_space: return False, "磁盘空间不足" return True, "" except Exception as e: return False, f"无法检查磁盘空间: {e}" def download_part(url, start, end, index, file_path, chunk_size=DEFAULT_CONFIG["advanced_config"]["chunk_size"], progress_data=None, thread_status=None, error_queue=None, max_retries=DEFAULT_CONFIG["extra_config"]["max_retries"]): headers = {'Range': f'bytes={start}-{end}', 'User-Agent': DEFAULT_CONFIG["extra_config"]["user_agent"]} # 添加 User-Agent retries = 0 while retries <= max_retries: try: if thread_status: thread_status['status'] = ThreadStatus.RUNNING r = requests.get(url, headers=headers, stream=True, timeout=DEFAULT_CONFIG["advanced_config"]["timeout"]) r.raise_for_status() with open(f'{file_path}.part{index}', 'wb') as f: for chunk in r.iter_content(chunk_size=chunk_size): if chunk: f.write(chunk) if progress_data: progress_data[index]['downloaded'] += len(chunk) return True # 下载成功 except requests.exceptions.RequestException as e: logging.error(f"线程 {index} 下载失败 (网络错误): {e}, 重试 {retries}/{max_retries}") if thread_status: thread_status['status'] = ThreadStatus.ERROR if error_queue: error_queue.put(f"线程 {index} 下载失败 (网络错误): {e}, 重试 {retries}/{max_retries} - {url} - {start}-{end}") retries += 1 time.sleep(DEFAULT_CONFIG["advanced_config"]["retry_delay"]) # 等待重试 except Exception as e: logging.error(f"线程 {index} 下载失败: {e}, 重试 {retries}/{max_retries}") if thread_status: thread_status['status'] = ThreadStatus.ERROR if error_queue: error_queue.put(f"线程 {index} 下载失败: {e}, 重试 {retries}/{max_retries} - {url} - {start}-{end}") retries += 1 time.sleep(DEFAULT_CONFIG["advanced_config"]["retry_delay"]) # 等待重试 if thread_status: thread_status['status'] = ThreadStatus.ERROR # 超过最大重试次数 return False # 超过最大重试次数,下载失败 def single_thread_download(url, download_dir, file_name, error_queue): full_path = os.path.join(download_dir, file_name) try: with requests.get(url, stream=True, timeout=DEFAULT_CONFIG["advanced_config"]["timeout"], headers={'User-Agent': DEFAULT_CONFIG["extra_config"]["user_agent"]}) as r: # 添加 User-Agent r.raise_for_status() total_length = r.headers.get('Content-Length') if total_length is None: print(f"{Fore.YELLOW}Content-Length 未知,无法显示进度。{Style.RESET_ALL}") else: total_length = int(total_length) with open(full_path, 'wb') as f: dl = 0 start_time = time.time() for chunk in r.iter_content(chunk_size=8192): if chunk: f.write(chunk) dl += len(chunk) if total_length: done = int(50 * dl / total_length) print(f" [{'=' * done}{' ' * (50-done)}] {100*dl/total_length:.2f}% ({format_file_size(dl)}/{format_file_size(total_length)})", end='') print() # 换行,完成进度条 print(f"{Fore.GREEN}单线程下载完成:{full_path}{Style.RESET_ALL}") except requests.exceptions.RequestException as e: logging.error(f"单线程下载失败 (网络错误): {e}") error_queue.put(f"单线程下载失败 (网络错误): {e}") return False except Exception as e: logging.error(f"单线程下载失败: {e}") error_queue.put(f"单线程下载失败: {e}") return False return True def merge_files(file_path, total_parts): try: with open(file_path, 'wb') as out_file: for i in range(total_parts): part_name = f'{file_path}.part{i}' with open(part_name, 'rb') as part_file: out_file.write(part_file.read()) os.remove(part_name) except Exception as e: logging.error(f"合并文件失败: {e}") raise def multi_thread_download(url, num_threads, download_dir, file_name, use_single_thread=False): start_time = time.time() file_size = 0 full_path = "" temp_download_dir = "" threads = [] progress_data = [] thread_statuses = [] download_success = False error_queue = queue.Queue() # 用于传递错误信息 is_aborted = False # 标志变量,指示下载是否被中止 # 启动错误输出窗口 error_log_file_path = os.path.join(os.environ.get("TEMP", "/tmp"), "downloader_errors.log") error_log_process = subprocess.Popen(["notepad.exe", error_log_file_path]) # 单独打开一个窗口 def signal_handler(sig, frame): nonlocal is_aborted print(f"{Fore.YELLOW}接收到中断信号,正在中止下载...{Style.RESET_ALL}") is_aborted = True for t in threads: if t.is_alive(): logging.warning(f"尝试终止线程: {t.name}") # 尝试安全地结束线程 t.join(timeout=2) # 等待线程结束,最多等待2秒 if t.is_alive(): logging.error(f"线程 {t.name} 无法正常结束,可能需要强制终止。") if error_log_process: error_log_process.terminate() # 关闭错误输出窗口 signal.signal(signal.SIGINT, signal_handler) # 注册信号处理程序 try: # 使用单线程下载标志 file_size, support_range = get_file_info(url, is_single_thread=use_single_thread) if not support_range and not use_single_thread: # and not use_single_thread print(f"{Fore.YELLOW}服务器不支持 Range 请求,尝试单线程下载{Style.RESET_ALL}") download_success = single_thread_download(url, download_dir, file_name, error_queue) return # 创建 Crdownload 目录 temp_download_dir = os.path.join(download_dir, "Crdownload") os.makedirs(temp_download_dir, exist_ok=True) full_path = os.path.join(temp_download_dir, file_name) space_ok, space_message = check_disk_space(file_size, download_dir) if not space_ok: print(f"{Fore.RED}{space_message}{Style.RESET_ALL}") return part_size = file_size // num_threads progress_data = [{'downloaded': 0} for _ in range(num_threads)] thread_statuses = [{'status': ThreadStatus.STOPPED} for _ in range(num_threads)] # 初始化线程状态 start_times = [time.time() for _ in range(num_threads)] def create_and_start_thread(index): start = index * part_size end = file_size - 1 if index == num_threads - 1 else start + part_size - 1 t = threading.Thread(target=download_part, args=(url, start, end, index, full_path, DEFAULT_CONFIG["advanced_config"]["chunk_size"], progress_data, thread_statuses[index], error_queue, DEFAULT_CONFIG["extra_config"]["max_retries"])) t.name = f"Thread-{index + 1}" # 设置线程名 threads.append(t) t.start() for i in range(num_threads): create_and_start_thread(i) total_downloaded = 0 last_total_downloaded = 0 last_update_time = time.time() while True: if is_aborted: print(f"{Fore.RED}下载已中止。{Style.RESET_ALL}") break if time.time() - last_update_time >= 1.0: total_downloaded = sum(p['downloaded'] for p in progress_data) progress_percent = (total_downloaded / file_size) * 100 download_speed = (total_downloaded - last_total_downloaded) / 1.0 last_total_downloaded = total_downloaded last_update_time = time.time() # 清屏 clear_screen() print("-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------") print(f"文件名: {file_name}") print(f"大小: {format_file_size(file_size)}") print(f"线程数: {num_threads}") print(f"保存路径: {os.path.join(download_dir, file_name)}") print("-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------") print(f"当前总进度:{progress_percent:.2f}%") bar_length = 50 filled_length = int(bar_length * progress_percent / 100) bar = '=' * filled_length + '-' * (bar_length - filled_length) print(f"[{bar}]") print("-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------") # 计算每个线程的已下载字节数和下载速度,并对齐输出 thread_info = [] for i in range(num_threads): thread_downloaded = progress_data[i]['downloaded'] thread_speed = (thread_downloaded / (time.time() - start_times[i])) if (time.time() - start_times[i]) > 0 else 0 thread_status_str = thread_statuses[i]['status'] thread_info.append((f"线程{i+1}", thread_status_str, f"({thread_downloaded}字节)({format_file_size(thread_downloaded)})")) # 计算最大长度,用于对齐输出 max_thread_label_len = max(len(info[0]) for info in thread_info) max_status_len = max(len(info[1]) for info in thread_info) for i in range(num_threads): thread_label, thread_status_str, thread_downloaded_str = thread_info[i] thread_downloaded = progress_data[i]['downloaded'] print(f"{thread_label:<{max_thread_label_len}}: {thread_status_str:<{max_status_len}} [{'-' * int(bar_length * (thread_downloaded / file_size))}=>{'-' * (bar_length - int(bar_length * (thread_downloaded / file_size)))}] {thread_downloaded_str}") print("-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------") print(f"当前速度: {format_file_size(download_speed)}/s") if total_downloaded >= file_size: download_success = True break # 检查是否有线程出错并重启,如果开启了线程同步 if DEFAULT_CONFIG["advanced_config"]["thread_sync"]: for i in range(num_threads): if thread_statuses[i]['status'] == ThreadStatus.ERROR: print(f"{Fore.RED}线程 {i+1} 发生错误,将在 {DEFAULT_CONFIG['advanced_config']['retry_delay']} 秒后重启...{Style.RESET_ALL}") time.sleep(DEFAULT_CONFIG['advanced_config']['retry_delay']) threads[i].join(timeout=0) # 等待线程结束 create_and_start_thread(i) # 重启线程 # 重置 start_times start_times[i] = time.time() if all(thread_status['status'] == ThreadStatus.STOPPED for thread_status in thread_statuses): break # 所有线程停止,退出循环 time.sleep(0.1) # 降低CPU占用 for t in threads: t.join() if download_success: print(f"{Fore.GREEN}合并分段文件中...{Style.RESET_ALL}") try: merge_files(full_path, num_threads) # 移动文件到目标目录 destination_path = os.path.join(download_dir, file_name) move(full_path, destination_path) print(f"{Fore.GREEN}下载完成:{destination_path}{Style.RESET_ALL}") except Exception as e: print(f"{Fore.RED}合并或移动文件失败: {e}{Style.RESET_ALL}") finally: # 删除临时目录 try: rmtree(temp_download_dir) print(f"{Fore.GREEN}已删除临时下载目录{Style.RESET_ALL}") except Exception as e: print(f"{Fore.RED}删除临时下载目录失败: {e}{Style.RESET_ALL}") else: print(f"{Fore.RED}下载失败,请检查网络或文件是否有效。{Style.RESET_ALL}") except Exception as e: logging.error(f"下载失败: {e}") with open(error_log_file_path, "a", encoding='utf-8') as f: # 将错误写入错误日志文件 f.write(f"下载失败: {e} ") traceback.print_exc(file=f) # 写入堆栈跟踪 finally: if error_log_process: error_log_process.terminate() # 关闭错误输出窗口 # 临时目录的清理已经被移动到下载成功和失败的逻辑中 end_time = time.time() logging.info(f"下载总耗时: {end_time - start_time:.2f}秒") # 写入queue中的错误到error_log_file_path with open(error_log_file_path, "a", encoding='utf-8') as f: while not error_queue.empty(): error_message = error_queue.get() f.write(f"{error_message} ") def clear_screen(): if os.name == 'nt': os.system('cls') else: os.system('clear') def load_config(): try: with open(CONFIG_FILE, "r", encoding='utf-8') as f: config = json.load(f) if not isinstance(config.get("num_threads"), int) or config["num_threads"] <= 0: print(f"{Fore.YELLOW}配置错误:线程数无效,使用默认值。{Style.RESET_ALL}") config["num_threads"] = DEFAULT_CONFIG["num_threads"] if not isinstance(config.get("download_dir"), str) or not os.path.isdir(config["download_dir"]): print(f"{Fore.YELLOW}配置错误:下载目录无效,使用默认值。{Style.RESET_ALL}") config["download_dir"] = DEFAULT_CONFIG["download_dir"] if not isinstance(config.get("advanced_config"), dict): print(f"{Fore.YELLOW}配置错误:高级配置无效,使用默认值。{Style.RESET_ALL}") config["advanced_config"] = DEFAULT_CONFIG["advanced_config"] else: if not isinstance(config["advanced_config"].get("chunk_size"), int) or config["advanced_config"]["chunk_size"] <= 0: print(f"{Fore.YELLOW}配置错误:chunk_size 无效,使用默认值。{Style.RESET_ALL}") config["advanced_config"]["chunk_size"] = DEFAULT_CONFIG["advanced_config"]["chunk_size"] if not isinstance(config["advanced_config"].get("timeout"), (int, float)) or config["advanced_config"]["timeout"] <= 0: print(f"{Fore.YELLOW}配置错误:timeout 无效,使用默认值。{Style.RESET_ALL}") config["advanced_config"]["timeout"] = DEFAULT_CONFIG["advanced_config"]["timeout"] if not isinstance(config["advanced_config"].get("retry_delay"), (int, float)) or config["advanced_config"]["retry_delay"] <= 0: print(f"{Fore.YELLOW}配置错误:retry_delay 无效,使用默认值。{Style.RESET_ALL}") config["advanced_config"]["retry_delay"] = DEFAULT_CONFIG["advanced_config"]["retry_delay"] if not isinstance(config["advanced_config"].get("thread_sync"), bool): print(f"{Fore.YELLOW}配置错误:thread_sync 无效,使用默认值。{Style.RESET_ALL}") config["advanced_config"]["thread_sync"] = DEFAULT_CONFIG["advanced_config"]["thread_sync"] if not isinstance(config.get("extra_config"), dict): print(f"{Fore.YELLOW}配置错误:额外配置无效,使用默认值。{Style.RESET_ALL}") config["extra_config"] = DEFAULT_CONFIG["extra_config"] else: if not isinstance(config["extra_config"].get("max_retries"), int) or config["extra_config"]["max_retries"] < 0: print(f"{Fore.YELLOW}配置错误:max_retries 无效,使用默认值。{Style.RESET_ALL}") config["extra_config"]["max_retries"] = DEFAULT_CONFIG["extra_config"]["max_retries"] if not isinstance(config["extra_config"].get("user_agent"), str): print(f"{Fore.YELLOW}配置错误:user_agent 无效,使用默认值。{Style.RESET_ALL}") config["extra_config"]["user_agent"] = DEFAULT_CONFIG["extra_config"]["user_agent"] return config except FileNotFoundError: print(f"{Fore.YELLOW}未找到配置文件,使用默认配置。{Style.RESET_ALL}") return DEFAULT_CONFIG except json.JSONDecodeError: print(f"{Fore.RED}配置文件格式错误,使用默认配置。{Style.RESET_ALL}") if input("是否删除配置文件并使用默认配置? (y/n): ").strip().lower() == 'y': try: os.remove(CONFIG_FILE) print(f"{Fore.GREEN}已删除损坏的配置文件。{Style.RESET_ALL}") except Exception as e: print(f"{Fore.RED}删除配置文件失败: {e}{Style.RESET_ALL}") return DEFAULT_CONFIG except Exception as e: print(f"{Fore.RED}加载配置失败: {e},使用默认配置。{Style.RESET_ALL}") return DEFAULT_CONFIG def save_config(config): try: with open(CONFIG_FILE, "w", encoding='utf-8') as f: json.dump(config, f, indent=4) print(f"{Fore.GREEN}配置已保存到 {CONFIG_FILE}{Style.RESET_ALL}") except Exception as e: print(f"{Fore.RED}保存配置失败: {e}{Style.RESET_ALL}") def menu(): config = load_config() # 首次运行提示用户设置 User-Agent if config["extra_config"]["user_agent"] == DEFAULT_CONFIG["extra_config"]["user_agent"]: print(f"{Fore.YELLOW}建议您设置 User-Agent 以提高下载成功率,请在高级配置中设置。{Style.RESET_ALL}") while True: clear_screen() print("PyDownloader 菜单") print("1. 开始下载") print("2. 配置线程数 (当前: {})".format(config["num_threads"])) print("3. 配置下载目录 (当前: {})".format(config["download_dir"])) print("4. 高级配置") print("0. 退出") choice = input("请选择: ").strip() clear_screen() if choice == '1' or validate_url(choice): url = choice if validate_url(choice) else input("请输入下载链接: ").strip() if validate_url(url): try: file_name = os.path.basename(urlsplit(url).path) if not file_name: file_name = "downloaded_file" # 启动错误输出窗口 log_file_path = setup_logging() full_log_file_path = setup_full_log(file_name) error_log_file_path = os.path.join(os.environ.get("TEMP", "/tmp"), "downloader_errors.log") error_log_process = subprocess.Popen(["notepad.exe", error_log_file_path]) multi_thread_download_wrapper(url, config["num_threads"], config["download_dir"], file_name) break except Exception as e: print(f"{Fore.RED}下载失败: {e}{Style.RESET_ALL}") logging.error(f"下载失败: {e}") logging.exception(f"详细错误信息:") url = add_protocol_if_missing(url) # 自动添加协议头 is_available, message = check_server_availability(url) if is_available: print(f"{Fore.GREEN}目标服务器访问成功: {message}{Style.RESET_ALL}") else: print(f"{Fore.RED}目标服务器访问失败: {message}{Style.RESET_ALL}") skip_check = input("是否跳过服务器可用性检查?(y/n/s - Skip): ").strip().lower() if skip_check == 's': pass elif skip_check == 'y': pass else: continue try: file_name = os.path.basename(urlsplit(url).path) if not file_name: file_name = "downloaded_file" # 启动错误输出窗口 log_file_path = setup_logging() full_log_file_path = setup_full_log(file_name) error_log_file_path = os.path.join(os.environ.get("TEMP", "/tmp"), "downloader_errors.log") error_log_process = subprocess.Popen(["notepad.exe", error_log_file_path]) multi_thread_download_wrapper(url, config["num_threads"], config["download_dir"], file_name) break except Exception as e: print(f"{Fore.RED}下载失败: {e}{Style.RESET_ALL}") logging.error(f"下载失败: {e}") logging.exception(f"详细错误信息:") elif choice == '2': try: num_threads = int(input("请输入线程数: ").strip()) if num_threads <= 0: print("线程数必须为正整数。") else: config["num_threads"] = num_threads save_config(config) except ValueError: print("无效的线程数。") elif choice == '3': download_dir = input("请输入下载文件夹路径: ").strip() if not download_dir: download_dir = get_default_download_path() config["download_dir"] = download_dir save_config(config) elif choice == '4': while True: clear_screen() print("高级配置") print(f"1. Chunk Size (当前: {config['advanced_config']['chunk_size']})") print(f"2. Timeout (当前: {config['advanced_config']['timeout']})") print(f"3. Retry Delay (当前: {config['advanced_config']['retry_delay']} 秒)") print(f"4. 线程同步 (当前: {config['advanced_config']['thread_sync']})") print(f"5. 最大重试次数 (当前: {config['extra_config']['max_retries']})") print(f"6. User-Agent (当前: {config['extra_config']['user_agent']})") print("0. 返回主菜单") sub_choice = input("请选择: ").strip() clear_screen() if sub_choice == '1': try: chunk_size = int(input("请输入Chunk Size (字节): ").strip()) if chunk_size <= 0: print("Chunk Size 必须为正整数。") else: config["advanced_config"]["chunk_size"] = chunk_size save_config( config) except ValueError: print("无效的Chunk Size。") elif sub_choice == '2': try: timeout = float(input("请输入Timeout (秒): ").strip()) if timeout <= 0: print("Timeout 必须为正数。") else: config["advanced_config"]["timeout"] = timeout save_config(config) except ValueError: print("无效的Timeout。") elif sub_choice == '3': try: retry_delay = float(input("请输入重试延迟 (秒): ").strip()) if retry_delay <= 0: print("重试延迟必须为正数。") else: config["advanced_config"]["retry_delay"] = retry_delay save_config(config) except ValueError: print("无效的重试延迟。") elif sub_choice == '4': config["advanced_config"]["thread_sync"] = not config["advanced_config"]["thread_sync"] print(f"线程同步已设置为: {config['advanced_config']['thread_sync']}") save_config(config) elif sub_choice == '5': try: max_retries = int(input("请输入最大重试次数: ").strip()) if max_retries < 0: print("最大重试次数必须为非负整数。") else: config["extra_config"]["max_retries"] = max_retries save_config(config) except ValueError: print("无效的最大重试次数。") elif sub_choice == '6': user_agent = input("请输入 User-Agent: ").strip() config["extra_config"]["user_agent"] = user_agent save_config(config) elif sub_choice == '0': break else: print("无效的选择,请重新输入。") time.sleep(1) elif choice == '0': sys.exit(0) else: print("无效的选择,请重新输入。") time.sleep(1) def multi_thread_download_wrapper(url, num_threads, download_dir, file_name): try: multi_thread_download(url, num_threads, download_dir, file_name) except Exception as e: print(f"{Fore.RED}多线程下载失败: {e}, 尝试单线程下载...{Style.RESET_ALL}") logging.error(f"多线程下载失败: {e}") logging.exception(f"多线程下载详细错误信息:") try: download_success = single_thread_download(url, download_dir, file_name, error_queue) if not download_success: print(f"{Fore.RED}单线程下载也失败,请尝试使用浏览器下载:{url}{Style.RESET_ALL}") if input("是否启动浏览器下载? (y/n): ").strip().lower() == 'y': webbrowser.open(url) except Exception as e: print(f"{Fore.RED}单线程下载失败: {e}, 无法启动浏览器下载{Style.RESET_ALL}") logging.error(f"单线程下载失败: {e}") logging.exception(f"单线程下载详细错误信息:") print(f"{Fore.RED}请手动使用浏览器下载:{url}{Style.RESET_ALL}") def main(): # 确保输出编码为 UTF-8 if sys.stdout.encoding != 'utf-8': sys.stdout = codecs.getwriter('utf-8')(sys.stdout.buffer, 'strict') check_and_install_modules() menu() if __name__ == '__main__': try: main() except KeyboardInterrupt: print(" 已取消操作。") except Exception as e: print(f"{Fore.RED}发生错误:{e}{Style.RESET_ALL}") logging.exception(f"程序发生严重错误: {e}") finally: pass