From 1175e6c75c4282cc944b8765fa340fc1eac78c8e Mon Sep 17 00:00:00 2001 From: PAN Lei Date: Sat, 25 Apr 2026 11:31:56 +0200 Subject: [PATCH] try to optimaze mail read --- src/mail/imap_fingerprint.py | 346 +++++++++++++++++++++++++++++ src/mail/imap_proxy_reader.py | 22 +- src/mail/mail_confirmation.py | 80 ++++++- src/mail/provider_strategy.py | 407 ++++++++++++++++++++++++++++++++++ 4 files changed, 839 insertions(+), 16 deletions(-) create mode 100644 src/mail/imap_fingerprint.py create mode 100644 src/mail/provider_strategy.py diff --git a/src/mail/imap_fingerprint.py b/src/mail/imap_fingerprint.py new file mode 100644 index 0000000..8e7a736 --- /dev/null +++ b/src/mail/imap_fingerprint.py @@ -0,0 +1,346 @@ +""" +imap_fingerprint.py +=================== +IMAP 客户端指纹伪装(IMAP ID 命令,RFC 2971)。 + +IMAP ID 命令允许客户端向服务器发送标识信息,服务器可能用它来: +- 统计客户端类型 +- 提供不同的功能或限制 +- 进行风控检测 + +通过伪装常见邮件客户端的指纹,可以避免被识别为自动化脚本。 + +支持的客户端指纹: +- Thunderbird (Mozilla) +- Microsoft Outlook +- Apple Mail +- Gmail (通过 IMAP) +- Yahoo Mail +- GMX Mail Client +- 通用 IMAP 客户端 +""" + +import random +from dataclasses import dataclass +from typing import Dict, List, Optional + +from src.mail.mail_constants import ( + DOMAIN_YAHOO, DOMAIN_GMX, DOMAIN_GMX_DE, DOMAIN_GMX_NET, + DOMAIN_GMX_FR, DOMAIN_GMX_AT, DOMAIN_GMX_CH, DOMAIN_GMX_US, + DOMAIN_GMX_PT, DOMAIN_GMX_SG, DOMAIN_GMAIL, DOMAIN_HOTMAIL, + DOMAIN_OUTLOOK_COM, DOMAIN_WEB_DE, DOMAIN_163, DOMAIN_RAMBLER_RU, + DOMAIN_NAVER, DOMAIN_ONET, DOMAIN_SINA, +) + + +@dataclass +class ImapFingerprint: + """ + IMAP 客户端指纹配置。 + + Attributes + ---------- + name : str + 客户端名称 + version : str + 客户端版本 + vendor : str + 供应商名称 + support_email : str + 支持邮箱地址 + os : str + 操作系统(可选) + os_version : str + 操作系统版本(可选) + """ + name: str + version: str + vendor: str = "" + support_email: str = "" + os: str = "" + os_version: str = "" + + def to_id_params(self) -> Dict[str, str]: + """转换为 IMAP ID 命令参数""" + params = { + "name": self.name, + "version": self.version, + } + if self.vendor: + params["vendor"] = self.vendor + if self.support_email: + params["support-email"] = self.support_email + if self.os: + params["os"] = self.os + if self.os_version: + params["os-version"] = self.os_version + return params + + def to_id_string(self) -> str: + """转换为 IMAP ID 命令字符串格式""" + params = self.to_id_params() + items = [] + for key, value in params.items(): + items.append(f'"{key}" "{value}"') + return " " + " ".join(items) + + +THUNDERBIRD_VERSIONS = [ + "115.10.1", "115.9.1", "115.8.1", "115.7.0", "115.6.0", + "102.15.1", "102.14.0", "102.13.0", "102.12.0", + "91.13.1", "91.12.0", "91.11.0", +] + +OUTLOOK_VERSIONS = [ + "16.0.17126.20132", "16.0.16827.20166", "16.0.16724.20182", + "16.0.16626.20164", "16.0.16529.20154", "16.0.16425.20122", + "15.0.5153.1000", "15.0.5041.1000", "15.0.4937.1000", + "14.0.7232.5000", "14.0.7172.5000", +] + +APPLE_MAIL_VERSIONS = [ + "16.0", "15.0", "14.0", "13.0", "12.0", "11.0", + "3736.500.121.1.1", "3736.400.56", "3731.600.57", +] + +GMX_VERSIONS = [ + "7.5.1", "7.5.0", "7.4.2", "7.4.1", "7.4.0", + "7.3.5", "7.3.4", "7.3.3", "7.3.2", "7.3.1", +] + +YAHOO_MAIL_VERSIONS = [ + "2.9.0", "2.8.5", "2.8.0", "2.7.5", "2.7.0", + "1.0.0", +] + +GMAIL_IMAP_VERSIONS = [ + "2.1.6", "2.1.5", "2.1.4", "2.1.3", "2.1.2", "2.1.1", "2.1.0", +] + +WINDOWS_VERSIONS = ["Windows 10", "Windows 11", "Windows 8.1", "Windows 7"] +MACOS_VERSIONS = ["macOS 14.4", "macOS 14.3", "macOS 13.6", "macOS 13.5", "macOS 12.7"] +LINUX_VERSIONS = ["Ubuntu 22.04", "Ubuntu 20.04", "Debian 12", "Fedora 39"] + + +DEFAULT_FINGERPRINTS: Dict[str, List[ImapFingerprint]] = { + "thunderbird_windows": [ + ImapFingerprint( + name="Thunderbird", + version=random.choice(THUNDERBIRD_VERSIONS), + vendor="Mozilla", + support_email="tb-feedback@mozilla.org", + os="Windows", + os_version=random.choice(WINDOWS_VERSIONS), + ) for _ in range(3) + ], + "thunderbird_mac": [ + ImapFingerprint( + name="Thunderbird", + version=random.choice(THUNDERBIRD_VERSIONS), + vendor="Mozilla", + support_email="tb-feedback@mozilla.org", + os="MacOS", + os_version=random.choice(MACOS_VERSIONS), + ) for _ in range(3) + ], + "outlook": [ + ImapFingerprint( + name="Microsoft Outlook", + version=random.choice(OUTLOOK_VERSIONS), + vendor="Microsoft", + support_email="outlook@microsoft.com", + os="Windows", + os_version=random.choice(WINDOWS_VERSIONS), + ) for _ in range(3) + ], + "apple_mail": [ + ImapFingerprint( + name="Apple Mail", + version=random.choice(APPLE_MAIL_VERSIONS), + vendor="Apple", + support_email="mail@apple.com", + os="MacOS", + os_version=random.choice(MACOS_VERSIONS), + ) for _ in range(3) + ], + "gmx_client": [ + ImapFingerprint( + name="GMX Mail", + version=random.choice(GMX_VERSIONS), + vendor="GMX", + support_email="support@gmx.com", + ) for _ in range(3) + ], + "yahoo_client": [ + ImapFingerprint( + name="YahooMailIMAP", + version=random.choice(YAHOO_MAIL_VERSIONS), + vendor="Yahoo", + support_email="imap-support@yahoo.com", + ) for _ in range(3) + ], + "gmail_imap": [ + ImapFingerprint( + name="GmailIMAP", + version=random.choice(GMAIL_IMAP_VERSIONS), + vendor="Google", + support_email="imap-support@google.com", + ) for _ in range(3) + ], + "generic": [ + ImapFingerprint( + name="IMAPClient", + version="1.0.0", + vendor="Generic", + ) + ], +} + + +PROVIDER_FINGERPRINT_MAP: Dict[str, List[str]] = { + DOMAIN_GMAIL: ["gmail_imap", "thunderbird_windows", "thunderbird_mac", "apple_mail"], + DOMAIN_YAHOO: ["yahoo_client", "thunderbird_windows", "thunderbird_mac"], + DOMAIN_HOTMAIL: ["outlook", "thunderbird_windows"], + DOMAIN_OUTLOOK_COM: ["outlook", "thunderbird_windows"], + DOMAIN_GMX: ["gmx_client", "thunderbird_windows", "thunderbird_mac"], + DOMAIN_GMX_DE: ["gmx_client", "thunderbird_windows", "thunderbird_mac"], + DOMAIN_GMX_NET: ["gmx_client", "thunderbird_windows", "thunderbird_mac"], + DOMAIN_GMX_FR: ["gmx_client", "thunderbird_windows", "thunderbird_mac"], + DOMAIN_GMX_AT: ["gmx_client", "thunderbird_windows", "thunderbird_mac"], + DOMAIN_GMX_CH: ["gmx_client", "thunderbird_windows", "thunderbird_mac"], + DOMAIN_GMX_US: ["gmx_client", "thunderbird_windows", "thunderbird_mac"], + DOMAIN_GMX_PT: ["gmx_client", "thunderbird_windows", "thunderbird_mac"], + DOMAIN_GMX_SG: ["gmx_client", "thunderbird_windows", "thunderbird_mac"], + DOMAIN_WEB_DE: ["gmx_client", "thunderbird_windows", "thunderbird_mac"], + DOMAIN_163: ["thunderbird_windows", "generic"], + DOMAIN_RAMBLER_RU: ["thunderbird_windows", "generic"], + DOMAIN_NAVER: ["thunderbird_windows", "thunderbird_mac", "generic"], + DOMAIN_ONET: ["thunderbird_windows", "thunderbird_mac", "generic"], + DOMAIN_SINA: ["thunderbird_windows", "generic"], +} + + +def get_fingerprint_for_provider(login: str) -> ImapFingerprint: + """ + 根据邮箱地址获取合适的伪装指纹。 + + Parameters + ---------- + login : str + 邮箱地址 + + Returns + ------- + ImapFingerprint + 伪装的 IMAP 客户端指纹 + """ + login_lower = login.lower() + + fingerprint_keys = ["generic"] + + for domain, keys in PROVIDER_FINGERPRINT_MAP.items(): + if domain in login_lower: + fingerprint_keys = keys + break + + selected_key = random.choice(fingerprint_keys) + fingerprints = DEFAULT_FINGERPRINTS.get(selected_key, DEFAULT_FINGERPRINTS["generic"]) + + return random.choice(fingerprints) + + +def get_random_fingerprint() -> ImapFingerprint: + """ + 获取随机伪装指纹。 + + Returns + ------- + ImapFingerprint + 随机选择的 IMAP 客户端指纹 + """ + all_keys = [ + "thunderbird_windows", "thunderbird_mac", "outlook", "apple_mail", + "gmx_client", "yahoo_client", "gmail_imap", + ] + selected_key = random.choice(all_keys) + fingerprints = DEFAULT_FINGERPRINTS.get(selected_key, DEFAULT_FINGERPRINTS["generic"]) + return random.choice(fingerprints) + + +def send_imap_id(imap_client, fingerprint: Optional[ImapFingerprint] = None) -> bool: + """ + 发送 IMAP ID 命令来伪装客户端指纹。 + + Parameters + ---------- + imap_client : IMAPClient 或 imaplib.IMAP4 + IMAP 客户端实例 + fingerprint : ImapFingerprint, optional + 要伪装的指纹,如果 None 则随机选择 + + Returns + ------- + bool + 是否成功发送 ID 命令 + """ + if fingerprint is None: + fingerprint = get_random_fingerprint() + + try: + if hasattr(imap_client, 'id_'): + params = fingerprint.to_id_params() + result = imap_client.id_(params) + logger.info(f"发送 IMAP ID 命令成功: {fingerprint.name} v{fingerprint.version}") + return True + elif hasattr(imap_client, 'send'): + id_string = fingerprint.to_id_string() + imap_client.send(f"ID{id_string}\r\n".encode()) + response = imap_client.readline() + logger.info(f"发送 IMAP ID 命令成功 (原生): {fingerprint.name} v{fingerprint.version}") + return True + else: + logger.warning("IMAP 客户端不支持 ID 命令") + return False + except Exception as e: + logger.warning(f"发送 IMAP ID 命令失败: {e}") + return False + + +logger = None + + +def init_logger(): + import logging + import sys + global logger + logger = logging.getLogger(__name__) + logger.setLevel(logging.INFO) + logger.addHandler(logging.StreamHandler(stream=sys.stdout)) + + +init_logger() + + +if __name__ == "__main__": + test_emails = [ + "user@gmail.com", + "user@yahoo.com", + "user@gmx.de", + "user@outlook.com", + "user@hotmail.com", + "user@163.com", + "user@web.de", + "user@unknown.com", + ] + + print("\nIMAP 指纹伪装测试:") + print("=" * 70) + for email in test_emails: + fp = get_fingerprint_for_provider(email) + print(f"{email:30} → {fp.name:20} v{fp.version:15} ({fp.os or 'N/A'})") + + print("\n随机指纹样本:") + print("=" * 70) + for i in range(10): + fp = get_random_fingerprint() + print(f" {fp.name:20} v{fp.version:15} vendor={fp.vendor:10} os={fp.os or 'N/A'}") \ No newline at end of file diff --git a/src/mail/imap_proxy_reader.py b/src/mail/imap_proxy_reader.py index 802028e..418a9f4 100644 --- a/src/mail/imap_proxy_reader.py +++ b/src/mail/imap_proxy_reader.py @@ -33,6 +33,8 @@ import socks from dotenv import load_dotenv from imapclient import IMAPClient +from src.mail.imap_fingerprint import get_fingerprint_for_provider, send_imap_id + load_dotenv() # ────────────────────────────────────────────────────────────── @@ -221,6 +223,8 @@ class ProxyIMAPClient(IMAPClient): Accessibles via ``client.subjects``. Utilisés par ``search_by_subjects()`` pour construire automatiquement les critères IMAP SUBJECT. + fingerprint : ImapFingerprint, optional + IMAP 客户端指纹伪装(自动根据邮箱地址选择)。 """ def __init__( @@ -228,11 +232,12 @@ class ProxyIMAPClient(IMAPClient): host: str, proxy: ProxyConfig, subjects: Optional[List[str]] = None, + fingerprint=None, **kwargs, ): self._proxy = proxy - # Sujets à rechercher, injectables depuis l'extérieur self.subjects: List[str] = list(subjects) if subjects else [] + self._fingerprint = fingerprint super().__init__(host, **kwargs) def _create_IMAP4(self): @@ -382,9 +387,13 @@ class ProxyMailReader: # ── Connexion ──────────────────────────────────────────── - def _connect(self) -> ProxyIMAPClient: + def _connect(self, login_email: str = None) -> ProxyIMAPClient: imap_server = get_imap_server(self.account.login) last_exc: Optional[Exception] = None + + fingerprint = None + if login_email: + fingerprint = get_fingerprint_for_provider(login_email) for attempt in range(1, self.max_retries + 1): logger.info( @@ -397,11 +406,16 @@ class ProxyMailReader: host=imap_server, proxy=self.proxy, subjects=self._subjects, + fingerprint=fingerprint, use_uid=True, ssl=True, timeout=self.timeout, ) client.login(self.account.login, self.account.password) + + if fingerprint: + send_imap_id(client, fingerprint) + logger.info( "[%s] Connecté (tentative %d). Sujets recherchés : %s", self.account.login, attempt, self._subjects, @@ -531,8 +545,8 @@ class ProxyMailReader: ] all_results: List[MailResult] = [] - seen_message_ids: set = set() # déduplication inter-dossiers - client = self._connect() + seen_message_ids: set = set() + client = self._connect(login_email=self.account.login) try: folders = self._list_folders(client) diff --git a/src/mail/mail_confirmation.py b/src/mail/mail_confirmation.py index c3ee5e0..1015498 100755 --- a/src/mail/mail_confirmation.py +++ b/src/mail/mail_confirmation.py @@ -2,6 +2,7 @@ import datetime import email import logging import sys +import time from builtins import list from concurrent.futures import ThreadPoolExecutor from email.header import decode_header @@ -11,9 +12,13 @@ from imapclient import IMAPClient from src.db.mirgration.migration_tools import migre_accepted_appointment from src.db.mongo_manager import MONGO_STORE_MANAGER +from src.mail.imap_fingerprint import get_fingerprint_for_provider, send_imap_id from src.mail.imap_proxy_reader import ProxyMailReader, MailAccount, ProxyConfig from src.mail.mail_constants import create_imap, show_folders, is_gmx_address, is_yahoo_address from src.mail.mail_reader import get_gmx_proxy_config, get_yahoo_proxy_config +from src.mail.provider_strategy import ( + get_strategy, group_mails_by_provider, apply_delay, ProviderStrategy, +) from src.notification.AcceptedResultPojo import get_accepted_result_from from src.notification.mailer import Mailer from src.pojo.ResultEnum import ResultEnum @@ -53,14 +58,18 @@ class MailConfirmationReader(): def read_emails(self, mails_messages: list) -> list: imap = create_imap(self.login) isImapClient = isinstance(imap, IMAPClient) - # authenticate + + fingerprint = get_fingerprint_for_provider(self.login) + if isImapClient: - # authenticate dat = imap.login(self.login, str(self.password)) print("type is {} for {}".format(dat, self.login)) + send_imap_id(imap, fingerprint) else: responseType, dat = imap.login(self.login, str(self.password)) print("type is {} for {}".format(responseType, self.login)) + send_imap_id(imap, fingerprint) + mail_list = [] print("read mails from {}".format(self.login)) if isImapClient: @@ -210,20 +219,67 @@ def find_confirmation_contacts_for_today(mode: str = 'default'): return _mail_list_to_scan +def read_mail_with_strategy(mail, mails_messages, strategy: ProviderStrategy, proxy_config=None, subjects=None): + """根据策略读取邮箱,包含延迟和重试""" + apply_delay(strategy) + + if proxy_config is not None: + account = MailAccount(login=mail.mail, password=mail.password) + reader = ProxyMailReader( + account, proxy_config, subjects=subjects, + max_retries=strategy.max_retries, + retry_delay=strategy.retry_backoff, + ) + results = reader.read(since=datetime.datetime.today()) + for result in results: + mail_pojo = MailPojo(subject=result.subject, body=result.body, from_address=result.from_address) + mail_pojo.mail_address = mail.mail + mail_pojo.to_address = result.to_address or mail.mail + mail_pojo.isImapClient = True + mails_messages.append(mail_pojo) + else: + mail_reader = MailConfirmationReader(mail.mail, mail.password) + mail_reader.read_emails(mails_messages) + + def find_confirmation_contacts_mail_list(mail_list, subjects: list = None): mails_messages = [] gmx_proxy_config = get_gmx_proxy_config() yahoo_proxy_config = get_yahoo_proxy_config() - # read all the emails - with ThreadPoolExecutor(max_workers=100) as executor: - for mail in mail_list: - if is_gmx_address(mail.mail) and gmx_proxy_config is not None: - executor.submit(read_gmx_proxy_confirmation_emails, mail, mails_messages, gmx_proxy_config, subjects) - elif is_yahoo_address(mail.mail) and yahoo_proxy_config is not None: - executor.submit(read_gmx_proxy_confirmation_emails, mail, mails_messages, yahoo_proxy_config, subjects) - else: - mail_reader = MailConfirmationReader(mail.mail, mail.password) - executor.submit(mail_reader.read_emails, mails_messages) + + grouped_mails = group_mails_by_provider(mail_list) + + for provider_key, provider_mail_list in grouped_mails.items(): + strategy = get_strategy(provider_key) if provider_key in ["gmail", "yahoo", "gmx", "outlook", "163", "rambler", "naver", "onet", "web_de", "inbox_lv", "sina", "pissmail", "default"] else get_strategy(provider_mail_list[0].mail) + print(f"[{strategy.name}] 处理 {len(provider_mail_list)} 个邮箱 (max_workers={strategy.max_workers})") + + with ThreadPoolExecutor(max_workers=strategy.max_workers) as executor: + futures = [] + processed = 0 + + for mail in provider_mail_list: + proxy_config = None + if is_gmx_address(mail.mail) and gmx_proxy_config is not None: + proxy_config = gmx_proxy_config + elif is_yahoo_address(mail.mail) and yahoo_proxy_config is not None: + proxy_config = yahoo_proxy_config + + future = executor.submit( + read_mail_with_strategy, mail, mails_messages, strategy, proxy_config, subjects + ) + futures.append(future) + processed += 1 + + if strategy.should_wait_after_batch(processed): + time.sleep(strategy.batch_delay) + processed = 0 + + for future in futures: + try: + future.result(timeout=strategy.timeout) + except Exception as e: + print(f"读取邮箱出错: {e}") + accepted_appointment_list = [] if len(mails_messages) > 0: successful_items = MONGO_STORE_MANAGER.get_all_successful_items_for_day() diff --git a/src/mail/provider_strategy.py b/src/mail/provider_strategy.py new file mode 100644 index 0000000..3a98775 --- /dev/null +++ b/src/mail/provider_strategy.py @@ -0,0 +1,407 @@ +""" +provider_strategy.py +==================== +不同邮箱供应商的风控策略配置。 + +每个供应商有不同的风控规则,需要针对性地调整: +- 并发限制 +- 请求延迟 +- 重试策略 +- 超时时间 +- 是否使用代理 +""" + +import random +import time +from dataclasses import dataclass +from typing import Dict, List + +from src.mail.mail_constants import ( + DOMAIN_163, DOMAIN_YAHOO, DOMAIN_GMAIL, DOMAIN_HOTMAIL, DOMAIN_OUTLOOK_COM, + DOMAIN_WEB_DE, DOMAIN_RAMBLER_RU, DOMAIN_NAVER, DOMAIN_ONET, + DOMAIN_GAZETA_PL, DOMAIN_INBOX_LV, DOMAIN_SINA, DOMAIN_PISS_MAIL, DOMAIN_INCEL_EMAIL, + DOMAIN_SHITPOSTING_EXPERT, DOMAIN_HATESJE_WS, DOMAIN_CHILD_PIZZA, + DOMAIN_GENOCIDE_FUN, DOMAIN_DMC_CHAT, GMX_DOMAINS, +) + + +@dataclass +class ProviderStrategy: + """ + 单个邮箱供应商的风控策略。 + + Attributes + ---------- + name : str + 供应商名称(用于日志) + max_workers : int + 该供应商的最大并发线程数 + min_delay : float + 每次操作前的最小延迟(秒) + max_delay : float + 每次操作前的最大延迟(秒) + max_retries : int + 连接失败时的最大重试次数 + retry_backoff : float + 重试时的指数退避基数(秒) + timeout : float + 连接超时时间(秒) + use_proxy : bool + 是否必须使用代理 + batch_size : int + 批次大小(每批处理多少邮箱) + batch_delay : float + 每批次之间的延迟(秒) + """ + name: str = "default" + max_workers: int = 10 + min_delay: float = 1.0 + max_delay: float = 3.0 + max_retries: int = 3 + retry_backoff: float = 2.0 + timeout: float = 30.0 + use_proxy: bool = False + batch_size: int = 20 + batch_delay: float = 60.0 + + def get_delay(self) -> float: + """返回随机延迟时间""" + return random.uniform(self.min_delay, self.max_delay) + + def get_retry_delay(self, attempt: int) -> float: + """返回重试延迟(指数退避)""" + return self.retry_backoff * (2 ** (attempt - 1)) + random.uniform(0, 1) + + def should_wait_after_batch(self, processed_count: int) -> bool: + """判断是否需要在批次后等待""" + return processed_count >= self.batch_size + + +PROVIDER_STRATEGIES: Dict[str, ProviderStrategy] = { + "gmail": ProviderStrategy( + name="Gmail", + max_workers=5, + min_delay=2.0, + max_delay=5.0, + max_retries=5, + retry_backoff=3.0, + timeout=45.0, + use_proxy=False, + batch_size=10, + batch_delay=120.0, + ), + "yahoo": ProviderStrategy( + name="Yahoo", + max_workers=3, + min_delay=3.0, + max_delay=8.0, + max_retries=5, + retry_backoff=4.0, + timeout=60.0, + use_proxy=True, + batch_size=5, + batch_delay=180.0, + ), + "gmx": ProviderStrategy( + name="GMX", + max_workers=8, + min_delay=1.5, + max_delay=4.0, + max_retries=8, + retry_backoff=2.5, + timeout=45.0, + use_proxy=True, + batch_size=15, + batch_delay=90.0, + ), + "outlook": ProviderStrategy( + name="Outlook/Microsoft", + max_workers=3, + min_delay=5.0, + max_delay=10.0, + max_retries=3, + retry_backoff=5.0, + timeout=60.0, + use_proxy=False, + batch_size=5, + batch_delay=300.0, + ), + "163": ProviderStrategy( + name="163", + max_workers=5, + min_delay=2.0, + max_delay=5.0, + max_retries=3, + retry_backoff=3.0, + timeout=30.0, + use_proxy=False, + batch_size=10, + batch_delay=120.0, + ), + "rambler": ProviderStrategy( + name="Rambler", + max_workers=8, + min_delay=1.0, + max_delay=3.0, + max_retries=5, + retry_backoff=2.0, + timeout=30.0, + use_proxy=False, + batch_size=20, + batch_delay=60.0, + ), + "naver": ProviderStrategy( + name="Naver", + max_workers=5, + min_delay=2.0, + max_delay=4.0, + max_retries=3, + retry_backoff=2.5, + timeout=30.0, + use_proxy=False, + batch_size=10, + batch_delay=90.0, + ), + "onet": ProviderStrategy( + name="Onet", + max_workers=6, + min_delay=1.5, + max_delay=3.5, + max_retries=4, + retry_backoff=2.0, + timeout=35.0, + use_proxy=False, + batch_size=15, + batch_delay=75.0, + ), + "web_de": ProviderStrategy( + name="Web.de", + max_workers=6, + min_delay=2.0, + max_delay=5.0, + max_retries=5, + retry_backoff=3.0, + timeout=40.0, + use_proxy=True, + batch_size=12, + batch_delay=100.0, + ), + "inbox_lv": ProviderStrategy( + name="Inbox.lv", + max_workers=8, + min_delay=1.0, + max_delay=2.5, + max_retries=3, + retry_backoff=1.5, + timeout=25.0, + use_proxy=False, + batch_size=20, + batch_delay=50.0, + ), + "sina": ProviderStrategy( + name="Sina", + max_workers=5, + min_delay=2.0, + max_delay=5.0, + max_retries=3, + retry_backoff=2.5, + timeout=30.0, + use_proxy=False, + batch_size=10, + batch_delay=120.0, + ), + "pissmail": ProviderStrategy( + name="Pissmail (临时邮箱)", + max_workers=15, + min_delay=0.5, + max_delay=1.5, + max_retries=2, + retry_backoff=1.0, + timeout=20.0, + use_proxy=False, + batch_size=30, + batch_delay=30.0, + ), + "default": ProviderStrategy( + name="默认策略", + max_workers=10, + min_delay=1.0, + max_delay=3.0, + max_retries=3, + retry_backoff=2.0, + timeout=30.0, + use_proxy=False, + batch_size=20, + batch_delay=60.0, + ), +} + + +def get_provider_key(login: str) -> str: + """ + 根据邮箱地址确定供应商策略键。 + + Parameters + ---------- + login : str + 邮箱地址 + + Returns + ------- + str + 供应商策略键(如 'gmail', 'yahoo', 'gmx' 等) + """ + login_lower = login.lower() + + if DOMAIN_GMAIL in login_lower: + return "gmail" + + if DOMAIN_YAHOO in login_lower: + return "yahoo" + + if any(domain in login_lower for domain in GMX_DOMAINS): + return "gmx" + + if DOMAIN_HOTMAIL in login_lower or DOMAIN_OUTLOOK_COM in login_lower: + return "outlook" + + if DOMAIN_163 in login_lower: + return "163" + + if DOMAIN_RAMBLER_RU in login_lower: + return "rambler" + + if DOMAIN_NAVER in login_lower: + return "naver" + + if DOMAIN_ONET in login_lower: + return "onet" + + if DOMAIN_GAZETA_PL in login_lower: + return "onet" + + if DOMAIN_WEB_DE in login_lower: + return "web_de" + + if DOMAIN_INBOX_LV in login_lower: + return "inbox_lv" + + if DOMAIN_SINA in login_lower: + return "sina" + + pissmail_domains = [ + DOMAIN_PISS_MAIL, DOMAIN_INCEL_EMAIL, DOMAIN_SHITPOSTING_EXPERT, + DOMAIN_HATESJE_WS, DOMAIN_CHILD_PIZZA, DOMAIN_GENOCIDE_FUN, DOMAIN_DMC_CHAT, + ] + if any(domain in login_lower for domain in pissmail_domains): + return "pissmail" + + return "default" + + +def get_strategy(login: str) -> ProviderStrategy: + """ + 根据邮箱地址获取对应的风控策略。 + + Parameters + ---------- + login : str + 邮箱地址 + + Returns + ------- + ProviderStrategy + 对应的风控策略 + """ + key = get_provider_key(login) + return PROVIDER_STRATEGIES.get(key, PROVIDER_STRATEGIES["default"]) + + +def group_mails_by_provider(mail_list: List) -> Dict[str, List]: + """ + 将邮箱列表按供应商分组。 + + Parameters + ---------- + mail_list : List + 邮箱对象列表(需要有 .mail 属性) + + Returns + ------- + Dict[str, List] + 分组后的邮箱字典 {provider_key: [mail_objects]} + """ + grouped: Dict[str, List] = {} + for mail in mail_list: + key = get_provider_key(mail.mail) + if key not in grouped: + grouped[key] = [] + grouped[key].append(mail) + return grouped + + +def apply_delay(strategy: ProviderStrategy) -> None: + """应用随机延迟""" + delay = strategy.get_delay() + time.sleep(delay) + + +def apply_batch_delay(strategy: ProviderStrategy) -> None: + """应用批次延迟""" + time.sleep(strategy.batch_delay) + + +class RateLimiter: + """ + 简单的速率限制器,用于跟踪和限制每个供应商的请求频率。 + """ + + def __init__(self): + self._request_counts: Dict[str, int] = {} + self._last_batch_time: Dict[str, float] = {} + + def record_request(self, provider_key: str) -> None: + """记录一次请求""" + self._request_counts[provider_key] = self._request_counts.get(provider_key, 0) + 1 + + def should_wait(self, provider_key: str, strategy: ProviderStrategy) -> bool: + """判断是否需要等待""" + count = self._request_counts.get(provider_key, 0) + return strategy.should_wait_after_batch(count) + + def wait_if_needed(self, provider_key: str, strategy: ProviderStrategy) -> None: + """如果需要,执行等待""" + if self.should_wait(provider_key, strategy): + apply_batch_delay(strategy) + self._request_counts[provider_key] = 0 + + def reset(self, provider_key: str) -> None: + """重置计数器""" + self._request_counts[provider_key] = 0 + + +if __name__ == "__main__": + test_emails = [ + "user@gmail.com", + "user@yahoo.com", + "user@gmx.de", + "user@gmx.fr", + "user@outlook.com", + "user@hotmail.com", + "user@163.com", + "user@rambler.ru", + "user@naver.com", + "user@onet.pl", + "user@web.de", + "user@inbox.lv", + "user@sina.com", + "user@pissmail.com", + "user@unknown.com", + ] + + print("邮箱供应商策略测试:") + print("=" * 70) + for email in test_emails: + strategy = get_strategy(email) + print(f"{email:30} → {strategy.name:15} (max_workers={strategy.max_workers}, " + f"delay={strategy.min_delay}-{strategy.max_delay}s)") \ No newline at end of file