try to optimaze mail read

This commit is contained in:
2026-04-25 11:31:56 +02:00
parent f6991f394d
commit 1175e6c75c
4 changed files with 839 additions and 16 deletions
+346
View File
@@ -0,0 +1,346 @@
"""
imap_fingerprint.py
===================
IMAP 客户端指纹伪装(IMAP ID 命令,RFC 2971)。
IMAP ID 命令允许客户端向服务器发送标识信息,服务器可能用它来:
- 统计客户端类型
- 提供不同的功能或限制
- 进行风控检测
通过伪装常见邮件客户端的指纹,可以避免被识别为自动化脚本。
支持的客户端指纹:
- Thunderbird (Mozilla)
- Microsoft Outlook
- Apple Mail
- Gmail (通过 IMAP)
- Yahoo Mail
- GMX Mail Client
- 通用 IMAP 客户端
"""
import random
from dataclasses import dataclass
from typing import Dict, List, Optional
from src.mail.mail_constants import (
DOMAIN_YAHOO, DOMAIN_GMX, DOMAIN_GMX_DE, DOMAIN_GMX_NET,
DOMAIN_GMX_FR, DOMAIN_GMX_AT, DOMAIN_GMX_CH, DOMAIN_GMX_US,
DOMAIN_GMX_PT, DOMAIN_GMX_SG, DOMAIN_GMAIL, DOMAIN_HOTMAIL,
DOMAIN_OUTLOOK_COM, DOMAIN_WEB_DE, DOMAIN_163, DOMAIN_RAMBLER_RU,
DOMAIN_NAVER, DOMAIN_ONET, DOMAIN_SINA,
)
@dataclass
class ImapFingerprint:
"""
IMAP 客户端指纹配置。
Attributes
----------
name : str
客户端名称
version : str
客户端版本
vendor : str
供应商名称
support_email : str
支持邮箱地址
os : str
操作系统(可选)
os_version : str
操作系统版本(可选)
"""
name: str
version: str
vendor: str = ""
support_email: str = ""
os: str = ""
os_version: str = ""
def to_id_params(self) -> Dict[str, str]:
"""转换为 IMAP ID 命令参数"""
params = {
"name": self.name,
"version": self.version,
}
if self.vendor:
params["vendor"] = self.vendor
if self.support_email:
params["support-email"] = self.support_email
if self.os:
params["os"] = self.os
if self.os_version:
params["os-version"] = self.os_version
return params
def to_id_string(self) -> str:
"""转换为 IMAP ID 命令字符串格式"""
params = self.to_id_params()
items = []
for key, value in params.items():
items.append(f'"{key}" "{value}"')
return " " + " ".join(items)
THUNDERBIRD_VERSIONS = [
"115.10.1", "115.9.1", "115.8.1", "115.7.0", "115.6.0",
"102.15.1", "102.14.0", "102.13.0", "102.12.0",
"91.13.1", "91.12.0", "91.11.0",
]
OUTLOOK_VERSIONS = [
"16.0.17126.20132", "16.0.16827.20166", "16.0.16724.20182",
"16.0.16626.20164", "16.0.16529.20154", "16.0.16425.20122",
"15.0.5153.1000", "15.0.5041.1000", "15.0.4937.1000",
"14.0.7232.5000", "14.0.7172.5000",
]
APPLE_MAIL_VERSIONS = [
"16.0", "15.0", "14.0", "13.0", "12.0", "11.0",
"3736.500.121.1.1", "3736.400.56", "3731.600.57",
]
GMX_VERSIONS = [
"7.5.1", "7.5.0", "7.4.2", "7.4.1", "7.4.0",
"7.3.5", "7.3.4", "7.3.3", "7.3.2", "7.3.1",
]
YAHOO_MAIL_VERSIONS = [
"2.9.0", "2.8.5", "2.8.0", "2.7.5", "2.7.0",
"1.0.0",
]
GMAIL_IMAP_VERSIONS = [
"2.1.6", "2.1.5", "2.1.4", "2.1.3", "2.1.2", "2.1.1", "2.1.0",
]
WINDOWS_VERSIONS = ["Windows 10", "Windows 11", "Windows 8.1", "Windows 7"]
MACOS_VERSIONS = ["macOS 14.4", "macOS 14.3", "macOS 13.6", "macOS 13.5", "macOS 12.7"]
LINUX_VERSIONS = ["Ubuntu 22.04", "Ubuntu 20.04", "Debian 12", "Fedora 39"]
DEFAULT_FINGERPRINTS: Dict[str, List[ImapFingerprint]] = {
"thunderbird_windows": [
ImapFingerprint(
name="Thunderbird",
version=random.choice(THUNDERBIRD_VERSIONS),
vendor="Mozilla",
support_email="tb-feedback@mozilla.org",
os="Windows",
os_version=random.choice(WINDOWS_VERSIONS),
) for _ in range(3)
],
"thunderbird_mac": [
ImapFingerprint(
name="Thunderbird",
version=random.choice(THUNDERBIRD_VERSIONS),
vendor="Mozilla",
support_email="tb-feedback@mozilla.org",
os="MacOS",
os_version=random.choice(MACOS_VERSIONS),
) for _ in range(3)
],
"outlook": [
ImapFingerprint(
name="Microsoft Outlook",
version=random.choice(OUTLOOK_VERSIONS),
vendor="Microsoft",
support_email="outlook@microsoft.com",
os="Windows",
os_version=random.choice(WINDOWS_VERSIONS),
) for _ in range(3)
],
"apple_mail": [
ImapFingerprint(
name="Apple Mail",
version=random.choice(APPLE_MAIL_VERSIONS),
vendor="Apple",
support_email="mail@apple.com",
os="MacOS",
os_version=random.choice(MACOS_VERSIONS),
) for _ in range(3)
],
"gmx_client": [
ImapFingerprint(
name="GMX Mail",
version=random.choice(GMX_VERSIONS),
vendor="GMX",
support_email="support@gmx.com",
) for _ in range(3)
],
"yahoo_client": [
ImapFingerprint(
name="YahooMailIMAP",
version=random.choice(YAHOO_MAIL_VERSIONS),
vendor="Yahoo",
support_email="imap-support@yahoo.com",
) for _ in range(3)
],
"gmail_imap": [
ImapFingerprint(
name="GmailIMAP",
version=random.choice(GMAIL_IMAP_VERSIONS),
vendor="Google",
support_email="imap-support@google.com",
) for _ in range(3)
],
"generic": [
ImapFingerprint(
name="IMAPClient",
version="1.0.0",
vendor="Generic",
)
],
}
PROVIDER_FINGERPRINT_MAP: Dict[str, List[str]] = {
DOMAIN_GMAIL: ["gmail_imap", "thunderbird_windows", "thunderbird_mac", "apple_mail"],
DOMAIN_YAHOO: ["yahoo_client", "thunderbird_windows", "thunderbird_mac"],
DOMAIN_HOTMAIL: ["outlook", "thunderbird_windows"],
DOMAIN_OUTLOOK_COM: ["outlook", "thunderbird_windows"],
DOMAIN_GMX: ["gmx_client", "thunderbird_windows", "thunderbird_mac"],
DOMAIN_GMX_DE: ["gmx_client", "thunderbird_windows", "thunderbird_mac"],
DOMAIN_GMX_NET: ["gmx_client", "thunderbird_windows", "thunderbird_mac"],
DOMAIN_GMX_FR: ["gmx_client", "thunderbird_windows", "thunderbird_mac"],
DOMAIN_GMX_AT: ["gmx_client", "thunderbird_windows", "thunderbird_mac"],
DOMAIN_GMX_CH: ["gmx_client", "thunderbird_windows", "thunderbird_mac"],
DOMAIN_GMX_US: ["gmx_client", "thunderbird_windows", "thunderbird_mac"],
DOMAIN_GMX_PT: ["gmx_client", "thunderbird_windows", "thunderbird_mac"],
DOMAIN_GMX_SG: ["gmx_client", "thunderbird_windows", "thunderbird_mac"],
DOMAIN_WEB_DE: ["gmx_client", "thunderbird_windows", "thunderbird_mac"],
DOMAIN_163: ["thunderbird_windows", "generic"],
DOMAIN_RAMBLER_RU: ["thunderbird_windows", "generic"],
DOMAIN_NAVER: ["thunderbird_windows", "thunderbird_mac", "generic"],
DOMAIN_ONET: ["thunderbird_windows", "thunderbird_mac", "generic"],
DOMAIN_SINA: ["thunderbird_windows", "generic"],
}
def get_fingerprint_for_provider(login: str) -> ImapFingerprint:
"""
根据邮箱地址获取合适的伪装指纹。
Parameters
----------
login : str
邮箱地址
Returns
-------
ImapFingerprint
伪装的 IMAP 客户端指纹
"""
login_lower = login.lower()
fingerprint_keys = ["generic"]
for domain, keys in PROVIDER_FINGERPRINT_MAP.items():
if domain in login_lower:
fingerprint_keys = keys
break
selected_key = random.choice(fingerprint_keys)
fingerprints = DEFAULT_FINGERPRINTS.get(selected_key, DEFAULT_FINGERPRINTS["generic"])
return random.choice(fingerprints)
def get_random_fingerprint() -> ImapFingerprint:
"""
获取随机伪装指纹。
Returns
-------
ImapFingerprint
随机选择的 IMAP 客户端指纹
"""
all_keys = [
"thunderbird_windows", "thunderbird_mac", "outlook", "apple_mail",
"gmx_client", "yahoo_client", "gmail_imap",
]
selected_key = random.choice(all_keys)
fingerprints = DEFAULT_FINGERPRINTS.get(selected_key, DEFAULT_FINGERPRINTS["generic"])
return random.choice(fingerprints)
def send_imap_id(imap_client, fingerprint: Optional[ImapFingerprint] = None) -> bool:
"""
发送 IMAP ID 命令来伪装客户端指纹。
Parameters
----------
imap_client : IMAPClient 或 imaplib.IMAP4
IMAP 客户端实例
fingerprint : ImapFingerprint, optional
要伪装的指纹,如果 None 则随机选择
Returns
-------
bool
是否成功发送 ID 命令
"""
if fingerprint is None:
fingerprint = get_random_fingerprint()
try:
if hasattr(imap_client, 'id_'):
params = fingerprint.to_id_params()
result = imap_client.id_(params)
logger.info(f"发送 IMAP ID 命令成功: {fingerprint.name} v{fingerprint.version}")
return True
elif hasattr(imap_client, 'send'):
id_string = fingerprint.to_id_string()
imap_client.send(f"ID{id_string}\r\n".encode())
response = imap_client.readline()
logger.info(f"发送 IMAP ID 命令成功 (原生): {fingerprint.name} v{fingerprint.version}")
return True
else:
logger.warning("IMAP 客户端不支持 ID 命令")
return False
except Exception as e:
logger.warning(f"发送 IMAP ID 命令失败: {e}")
return False
logger = None
def init_logger():
import logging
import sys
global logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(stream=sys.stdout))
init_logger()
if __name__ == "__main__":
test_emails = [
"user@gmail.com",
"user@yahoo.com",
"user@gmx.de",
"user@outlook.com",
"user@hotmail.com",
"user@163.com",
"user@web.de",
"user@unknown.com",
]
print("\nIMAP 指纹伪装测试:")
print("=" * 70)
for email in test_emails:
fp = get_fingerprint_for_provider(email)
print(f"{email:30}{fp.name:20} v{fp.version:15} ({fp.os or 'N/A'})")
print("\n随机指纹样本:")
print("=" * 70)
for i in range(10):
fp = get_random_fingerprint()
print(f" {fp.name:20} v{fp.version:15} vendor={fp.vendor:10} os={fp.os or 'N/A'}")
+18 -4
View File
@@ -33,6 +33,8 @@ import socks
from dotenv import load_dotenv from dotenv import load_dotenv
from imapclient import IMAPClient from imapclient import IMAPClient
from src.mail.imap_fingerprint import get_fingerprint_for_provider, send_imap_id
load_dotenv() load_dotenv()
# ────────────────────────────────────────────────────────────── # ──────────────────────────────────────────────────────────────
@@ -221,6 +223,8 @@ class ProxyIMAPClient(IMAPClient):
Accessibles via ``client.subjects``. Accessibles via ``client.subjects``.
Utilisés par ``search_by_subjects()`` pour construire Utilisés par ``search_by_subjects()`` pour construire
automatiquement les critères IMAP SUBJECT. automatiquement les critères IMAP SUBJECT.
fingerprint : ImapFingerprint, optional
IMAP 客户端指纹伪装(自动根据邮箱地址选择)。
""" """
def __init__( def __init__(
@@ -228,11 +232,12 @@ class ProxyIMAPClient(IMAPClient):
host: str, host: str,
proxy: ProxyConfig, proxy: ProxyConfig,
subjects: Optional[List[str]] = None, subjects: Optional[List[str]] = None,
fingerprint=None,
**kwargs, **kwargs,
): ):
self._proxy = proxy self._proxy = proxy
# Sujets à rechercher, injectables depuis l'extérieur
self.subjects: List[str] = list(subjects) if subjects else [] self.subjects: List[str] = list(subjects) if subjects else []
self._fingerprint = fingerprint
super().__init__(host, **kwargs) super().__init__(host, **kwargs)
def _create_IMAP4(self): def _create_IMAP4(self):
@@ -382,10 +387,14 @@ class ProxyMailReader:
# ── Connexion ──────────────────────────────────────────── # ── Connexion ────────────────────────────────────────────
def _connect(self) -> ProxyIMAPClient: def _connect(self, login_email: str = None) -> ProxyIMAPClient:
imap_server = get_imap_server(self.account.login) imap_server = get_imap_server(self.account.login)
last_exc: Optional[Exception] = None last_exc: Optional[Exception] = None
fingerprint = None
if login_email:
fingerprint = get_fingerprint_for_provider(login_email)
for attempt in range(1, self.max_retries + 1): for attempt in range(1, self.max_retries + 1):
logger.info( logger.info(
"[%s] Tentative %d/%d — Connexion via %s%s:993", "[%s] Tentative %d/%d — Connexion via %s%s:993",
@@ -397,11 +406,16 @@ class ProxyMailReader:
host=imap_server, host=imap_server,
proxy=self.proxy, proxy=self.proxy,
subjects=self._subjects, subjects=self._subjects,
fingerprint=fingerprint,
use_uid=True, use_uid=True,
ssl=True, ssl=True,
timeout=self.timeout, timeout=self.timeout,
) )
client.login(self.account.login, self.account.password) client.login(self.account.login, self.account.password)
if fingerprint:
send_imap_id(client, fingerprint)
logger.info( logger.info(
"[%s] Connecté (tentative %d). Sujets recherchés : %s", "[%s] Connecté (tentative %d). Sujets recherchés : %s",
self.account.login, attempt, self._subjects, self.account.login, attempt, self._subjects,
@@ -531,8 +545,8 @@ class ProxyMailReader:
] ]
all_results: List[MailResult] = [] all_results: List[MailResult] = []
seen_message_ids: set = set() # déduplication inter-dossiers seen_message_ids: set = set()
client = self._connect() client = self._connect(login_email=self.account.login)
try: try:
folders = self._list_folders(client) folders = self._list_folders(client)
+68 -12
View File
@@ -2,6 +2,7 @@ import datetime
import email import email
import logging import logging
import sys import sys
import time
from builtins import list from builtins import list
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from email.header import decode_header from email.header import decode_header
@@ -11,9 +12,13 @@ from imapclient import IMAPClient
from src.db.mirgration.migration_tools import migre_accepted_appointment from src.db.mirgration.migration_tools import migre_accepted_appointment
from src.db.mongo_manager import MONGO_STORE_MANAGER from src.db.mongo_manager import MONGO_STORE_MANAGER
from src.mail.imap_fingerprint import get_fingerprint_for_provider, send_imap_id
from src.mail.imap_proxy_reader import ProxyMailReader, MailAccount, ProxyConfig from src.mail.imap_proxy_reader import ProxyMailReader, MailAccount, ProxyConfig
from src.mail.mail_constants import create_imap, show_folders, is_gmx_address, is_yahoo_address from src.mail.mail_constants import create_imap, show_folders, is_gmx_address, is_yahoo_address
from src.mail.mail_reader import get_gmx_proxy_config, get_yahoo_proxy_config from src.mail.mail_reader import get_gmx_proxy_config, get_yahoo_proxy_config
from src.mail.provider_strategy import (
get_strategy, group_mails_by_provider, apply_delay, ProviderStrategy,
)
from src.notification.AcceptedResultPojo import get_accepted_result_from from src.notification.AcceptedResultPojo import get_accepted_result_from
from src.notification.mailer import Mailer from src.notification.mailer import Mailer
from src.pojo.ResultEnum import ResultEnum from src.pojo.ResultEnum import ResultEnum
@@ -53,14 +58,18 @@ class MailConfirmationReader():
def read_emails(self, mails_messages: list) -> list: def read_emails(self, mails_messages: list) -> list:
imap = create_imap(self.login) imap = create_imap(self.login)
isImapClient = isinstance(imap, IMAPClient) isImapClient = isinstance(imap, IMAPClient)
# authenticate
fingerprint = get_fingerprint_for_provider(self.login)
if isImapClient: if isImapClient:
# authenticate
dat = imap.login(self.login, str(self.password)) dat = imap.login(self.login, str(self.password))
print("type is {} for {}".format(dat, self.login)) print("type is {} for {}".format(dat, self.login))
send_imap_id(imap, fingerprint)
else: else:
responseType, dat = imap.login(self.login, str(self.password)) responseType, dat = imap.login(self.login, str(self.password))
print("type is {} for {}".format(responseType, self.login)) print("type is {} for {}".format(responseType, self.login))
send_imap_id(imap, fingerprint)
mail_list = [] mail_list = []
print("read mails from {}".format(self.login)) print("read mails from {}".format(self.login))
if isImapClient: if isImapClient:
@@ -210,20 +219,67 @@ def find_confirmation_contacts_for_today(mode: str = 'default'):
return _mail_list_to_scan return _mail_list_to_scan
def read_mail_with_strategy(mail, mails_messages, strategy: ProviderStrategy, proxy_config=None, subjects=None):
"""根据策略读取邮箱,包含延迟和重试"""
apply_delay(strategy)
if proxy_config is not None:
account = MailAccount(login=mail.mail, password=mail.password)
reader = ProxyMailReader(
account, proxy_config, subjects=subjects,
max_retries=strategy.max_retries,
retry_delay=strategy.retry_backoff,
)
results = reader.read(since=datetime.datetime.today())
for result in results:
mail_pojo = MailPojo(subject=result.subject, body=result.body, from_address=result.from_address)
mail_pojo.mail_address = mail.mail
mail_pojo.to_address = result.to_address or mail.mail
mail_pojo.isImapClient = True
mails_messages.append(mail_pojo)
else:
mail_reader = MailConfirmationReader(mail.mail, mail.password)
mail_reader.read_emails(mails_messages)
def find_confirmation_contacts_mail_list(mail_list, subjects: list = None): def find_confirmation_contacts_mail_list(mail_list, subjects: list = None):
mails_messages = [] mails_messages = []
gmx_proxy_config = get_gmx_proxy_config() gmx_proxy_config = get_gmx_proxy_config()
yahoo_proxy_config = get_yahoo_proxy_config() yahoo_proxy_config = get_yahoo_proxy_config()
# read all the emails
with ThreadPoolExecutor(max_workers=100) as executor: grouped_mails = group_mails_by_provider(mail_list)
for mail in mail_list:
if is_gmx_address(mail.mail) and gmx_proxy_config is not None: for provider_key, provider_mail_list in grouped_mails.items():
executor.submit(read_gmx_proxy_confirmation_emails, mail, mails_messages, gmx_proxy_config, subjects) strategy = get_strategy(provider_key) if provider_key in ["gmail", "yahoo", "gmx", "outlook", "163", "rambler", "naver", "onet", "web_de", "inbox_lv", "sina", "pissmail", "default"] else get_strategy(provider_mail_list[0].mail)
elif is_yahoo_address(mail.mail) and yahoo_proxy_config is not None: print(f"[{strategy.name}] 处理 {len(provider_mail_list)} 个邮箱 (max_workers={strategy.max_workers})")
executor.submit(read_gmx_proxy_confirmation_emails, mail, mails_messages, yahoo_proxy_config, subjects)
else: with ThreadPoolExecutor(max_workers=strategy.max_workers) as executor:
mail_reader = MailConfirmationReader(mail.mail, mail.password) futures = []
executor.submit(mail_reader.read_emails, mails_messages) processed = 0
for mail in provider_mail_list:
proxy_config = None
if is_gmx_address(mail.mail) and gmx_proxy_config is not None:
proxy_config = gmx_proxy_config
elif is_yahoo_address(mail.mail) and yahoo_proxy_config is not None:
proxy_config = yahoo_proxy_config
future = executor.submit(
read_mail_with_strategy, mail, mails_messages, strategy, proxy_config, subjects
)
futures.append(future)
processed += 1
if strategy.should_wait_after_batch(processed):
time.sleep(strategy.batch_delay)
processed = 0
for future in futures:
try:
future.result(timeout=strategy.timeout)
except Exception as e:
print(f"读取邮箱出错: {e}")
accepted_appointment_list = [] accepted_appointment_list = []
if len(mails_messages) > 0: if len(mails_messages) > 0:
successful_items = MONGO_STORE_MANAGER.get_all_successful_items_for_day() successful_items = MONGO_STORE_MANAGER.get_all_successful_items_for_day()
+407
View File
@@ -0,0 +1,407 @@
"""
provider_strategy.py
====================
不同邮箱供应商的风控策略配置。
每个供应商有不同的风控规则,需要针对性地调整:
- 并发限制
- 请求延迟
- 重试策略
- 超时时间
- 是否使用代理
"""
import random
import time
from dataclasses import dataclass
from typing import Dict, List
from src.mail.mail_constants import (
DOMAIN_163, DOMAIN_YAHOO, DOMAIN_GMAIL, DOMAIN_HOTMAIL, DOMAIN_OUTLOOK_COM,
DOMAIN_WEB_DE, DOMAIN_RAMBLER_RU, DOMAIN_NAVER, DOMAIN_ONET,
DOMAIN_GAZETA_PL, DOMAIN_INBOX_LV, DOMAIN_SINA, DOMAIN_PISS_MAIL, DOMAIN_INCEL_EMAIL,
DOMAIN_SHITPOSTING_EXPERT, DOMAIN_HATESJE_WS, DOMAIN_CHILD_PIZZA,
DOMAIN_GENOCIDE_FUN, DOMAIN_DMC_CHAT, GMX_DOMAINS,
)
@dataclass
class ProviderStrategy:
"""
单个邮箱供应商的风控策略。
Attributes
----------
name : str
供应商名称(用于日志)
max_workers : int
该供应商的最大并发线程数
min_delay : float
每次操作前的最小延迟(秒)
max_delay : float
每次操作前的最大延迟(秒)
max_retries : int
连接失败时的最大重试次数
retry_backoff : float
重试时的指数退避基数(秒)
timeout : float
连接超时时间(秒)
use_proxy : bool
是否必须使用代理
batch_size : int
批次大小(每批处理多少邮箱)
batch_delay : float
每批次之间的延迟(秒)
"""
name: str = "default"
max_workers: int = 10
min_delay: float = 1.0
max_delay: float = 3.0
max_retries: int = 3
retry_backoff: float = 2.0
timeout: float = 30.0
use_proxy: bool = False
batch_size: int = 20
batch_delay: float = 60.0
def get_delay(self) -> float:
"""返回随机延迟时间"""
return random.uniform(self.min_delay, self.max_delay)
def get_retry_delay(self, attempt: int) -> float:
"""返回重试延迟(指数退避)"""
return self.retry_backoff * (2 ** (attempt - 1)) + random.uniform(0, 1)
def should_wait_after_batch(self, processed_count: int) -> bool:
"""判断是否需要在批次后等待"""
return processed_count >= self.batch_size
PROVIDER_STRATEGIES: Dict[str, ProviderStrategy] = {
"gmail": ProviderStrategy(
name="Gmail",
max_workers=5,
min_delay=2.0,
max_delay=5.0,
max_retries=5,
retry_backoff=3.0,
timeout=45.0,
use_proxy=False,
batch_size=10,
batch_delay=120.0,
),
"yahoo": ProviderStrategy(
name="Yahoo",
max_workers=3,
min_delay=3.0,
max_delay=8.0,
max_retries=5,
retry_backoff=4.0,
timeout=60.0,
use_proxy=True,
batch_size=5,
batch_delay=180.0,
),
"gmx": ProviderStrategy(
name="GMX",
max_workers=8,
min_delay=1.5,
max_delay=4.0,
max_retries=8,
retry_backoff=2.5,
timeout=45.0,
use_proxy=True,
batch_size=15,
batch_delay=90.0,
),
"outlook": ProviderStrategy(
name="Outlook/Microsoft",
max_workers=3,
min_delay=5.0,
max_delay=10.0,
max_retries=3,
retry_backoff=5.0,
timeout=60.0,
use_proxy=False,
batch_size=5,
batch_delay=300.0,
),
"163": ProviderStrategy(
name="163",
max_workers=5,
min_delay=2.0,
max_delay=5.0,
max_retries=3,
retry_backoff=3.0,
timeout=30.0,
use_proxy=False,
batch_size=10,
batch_delay=120.0,
),
"rambler": ProviderStrategy(
name="Rambler",
max_workers=8,
min_delay=1.0,
max_delay=3.0,
max_retries=5,
retry_backoff=2.0,
timeout=30.0,
use_proxy=False,
batch_size=20,
batch_delay=60.0,
),
"naver": ProviderStrategy(
name="Naver",
max_workers=5,
min_delay=2.0,
max_delay=4.0,
max_retries=3,
retry_backoff=2.5,
timeout=30.0,
use_proxy=False,
batch_size=10,
batch_delay=90.0,
),
"onet": ProviderStrategy(
name="Onet",
max_workers=6,
min_delay=1.5,
max_delay=3.5,
max_retries=4,
retry_backoff=2.0,
timeout=35.0,
use_proxy=False,
batch_size=15,
batch_delay=75.0,
),
"web_de": ProviderStrategy(
name="Web.de",
max_workers=6,
min_delay=2.0,
max_delay=5.0,
max_retries=5,
retry_backoff=3.0,
timeout=40.0,
use_proxy=True,
batch_size=12,
batch_delay=100.0,
),
"inbox_lv": ProviderStrategy(
name="Inbox.lv",
max_workers=8,
min_delay=1.0,
max_delay=2.5,
max_retries=3,
retry_backoff=1.5,
timeout=25.0,
use_proxy=False,
batch_size=20,
batch_delay=50.0,
),
"sina": ProviderStrategy(
name="Sina",
max_workers=5,
min_delay=2.0,
max_delay=5.0,
max_retries=3,
retry_backoff=2.5,
timeout=30.0,
use_proxy=False,
batch_size=10,
batch_delay=120.0,
),
"pissmail": ProviderStrategy(
name="Pissmail (临时邮箱)",
max_workers=15,
min_delay=0.5,
max_delay=1.5,
max_retries=2,
retry_backoff=1.0,
timeout=20.0,
use_proxy=False,
batch_size=30,
batch_delay=30.0,
),
"default": ProviderStrategy(
name="默认策略",
max_workers=10,
min_delay=1.0,
max_delay=3.0,
max_retries=3,
retry_backoff=2.0,
timeout=30.0,
use_proxy=False,
batch_size=20,
batch_delay=60.0,
),
}
def get_provider_key(login: str) -> str:
"""
根据邮箱地址确定供应商策略键。
Parameters
----------
login : str
邮箱地址
Returns
-------
str
供应商策略键(如 'gmail', 'yahoo', 'gmx' 等)
"""
login_lower = login.lower()
if DOMAIN_GMAIL in login_lower:
return "gmail"
if DOMAIN_YAHOO in login_lower:
return "yahoo"
if any(domain in login_lower for domain in GMX_DOMAINS):
return "gmx"
if DOMAIN_HOTMAIL in login_lower or DOMAIN_OUTLOOK_COM in login_lower:
return "outlook"
if DOMAIN_163 in login_lower:
return "163"
if DOMAIN_RAMBLER_RU in login_lower:
return "rambler"
if DOMAIN_NAVER in login_lower:
return "naver"
if DOMAIN_ONET in login_lower:
return "onet"
if DOMAIN_GAZETA_PL in login_lower:
return "onet"
if DOMAIN_WEB_DE in login_lower:
return "web_de"
if DOMAIN_INBOX_LV in login_lower:
return "inbox_lv"
if DOMAIN_SINA in login_lower:
return "sina"
pissmail_domains = [
DOMAIN_PISS_MAIL, DOMAIN_INCEL_EMAIL, DOMAIN_SHITPOSTING_EXPERT,
DOMAIN_HATESJE_WS, DOMAIN_CHILD_PIZZA, DOMAIN_GENOCIDE_FUN, DOMAIN_DMC_CHAT,
]
if any(domain in login_lower for domain in pissmail_domains):
return "pissmail"
return "default"
def get_strategy(login: str) -> ProviderStrategy:
"""
根据邮箱地址获取对应的风控策略。
Parameters
----------
login : str
邮箱地址
Returns
-------
ProviderStrategy
对应的风控策略
"""
key = get_provider_key(login)
return PROVIDER_STRATEGIES.get(key, PROVIDER_STRATEGIES["default"])
def group_mails_by_provider(mail_list: List) -> Dict[str, List]:
"""
将邮箱列表按供应商分组。
Parameters
----------
mail_list : List
邮箱对象列表(需要有 .mail 属性)
Returns
-------
Dict[str, List]
分组后的邮箱字典 {provider_key: [mail_objects]}
"""
grouped: Dict[str, List] = {}
for mail in mail_list:
key = get_provider_key(mail.mail)
if key not in grouped:
grouped[key] = []
grouped[key].append(mail)
return grouped
def apply_delay(strategy: ProviderStrategy) -> None:
"""应用随机延迟"""
delay = strategy.get_delay()
time.sleep(delay)
def apply_batch_delay(strategy: ProviderStrategy) -> None:
"""应用批次延迟"""
time.sleep(strategy.batch_delay)
class RateLimiter:
"""
简单的速率限制器,用于跟踪和限制每个供应商的请求频率。
"""
def __init__(self):
self._request_counts: Dict[str, int] = {}
self._last_batch_time: Dict[str, float] = {}
def record_request(self, provider_key: str) -> None:
"""记录一次请求"""
self._request_counts[provider_key] = self._request_counts.get(provider_key, 0) + 1
def should_wait(self, provider_key: str, strategy: ProviderStrategy) -> bool:
"""判断是否需要等待"""
count = self._request_counts.get(provider_key, 0)
return strategy.should_wait_after_batch(count)
def wait_if_needed(self, provider_key: str, strategy: ProviderStrategy) -> None:
"""如果需要,执行等待"""
if self.should_wait(provider_key, strategy):
apply_batch_delay(strategy)
self._request_counts[provider_key] = 0
def reset(self, provider_key: str) -> None:
"""重置计数器"""
self._request_counts[provider_key] = 0
if __name__ == "__main__":
test_emails = [
"user@gmail.com",
"user@yahoo.com",
"user@gmx.de",
"user@gmx.fr",
"user@outlook.com",
"user@hotmail.com",
"user@163.com",
"user@rambler.ru",
"user@naver.com",
"user@onet.pl",
"user@web.de",
"user@inbox.lv",
"user@sina.com",
"user@pissmail.com",
"user@unknown.com",
]
print("邮箱供应商策略测试:")
print("=" * 70)
for email in test_emails:
strategy = get_strategy(email)
print(f"{email:30}{strategy.name:15} (max_workers={strategy.max_workers}, "
f"delay={strategy.min_delay}-{strategy.max_delay}s)")