import datetime import email import logging import os import random import re import time from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, as_completed from email.header import decode_header from typing import Union, List, Optional, Dict from dotenv import load_dotenv from db.mongo_manager import MONGO_STORE_MANAGER from excel_reader import read_contacts from mail.mail_constants import DOMAIN_HOTMAIL, create_imap, show_folders from mail.imap_proxy_reader import ( ProxyIMAPClient, ProxyConfig, get_imap_server, extract_body, VALIDATION_URL_SUBJECT_FR, VALIDATION_URL_SUBJECT_EN, VALIDATION_URL_REGEX, DATE_FORMAT, ) from imapclient import IMAPClient from models.ReserveResultPojo import ReserveResultPojo from models.mail_pojo import MailPojo, MailAddress # Charger les variables d'environnement depuis .env load_dotenv() # ── Constantes locales ──────────────────────────────────────────────────────── # VALIDATION_URL_SUBJECT_FR, VALIDATION_URL_SUBJECT_EN, VALIDATION_URL_REGEX, # DATE_FORMAT sont importés depuis imap_proxy_reader (source de vérité unique). PART_VALIDATION_URL_REGEX = r"client\/register\/[A-Z0-9]+\/validate.code=[A-Z0-9]+" HERMES_EMAIL = "no-reply@hermes.com" EMAIL_ADDRESS_REGEX = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b' # Timeouts GMX (en secondes) IMAP_SOCKET_TIMEOUT = 300 # timeout socket pour chaque opération IMAP FUTURE_TIMEOUT = 600 # durée max allouée à la lecture d'une boîte mail # 按域名限制的最大并发线程数(防止单服务商触发风控) MAX_WORKERS_PER_DOMAIN: Dict[str, int] = { "gmx": 80, "aol": 5, "gmail": 3, "yahoo": 5, "outlook": 5, "hotmail": 5, "firemail": 5, "inbox.lv": 5, "default": 5, } # 两次读取同一邮箱的最短间隔(分钟),避免频繁重复登录 MAIL_READ_MIN_INTERVAL_MINUTES = 15 # GMX域名列表(用于判断是否需要使用代理) GMX_DOMAINS = ( "gmx.com", "gmx.net", "gmx.de", "gmx.at", "gmx.fr", "gmx.us", "gmx.sg", "gmx.ch", "gmx.pt", ) # 需要通过代理读取的域名列表 PROXY_DOMAINS = GMX_DOMAINS + ("yahoo.com",) # PROXY_DOMAINS = GMX_DOMAINS + ("yahoo.com",) # PROXY_DOMAINS = GMX_DOMAINS def is_gmx_account(login: str) -> bool: """判断邮箱是否属于GMX域名""" return any(d in login.lower() for d in GMX_DOMAINS) def is_proxy_account(login: str) -> bool: """判断邮箱是否需要通过代理读取(GMX 或 inbox.lv)""" return any(d in login.lower() for d in PROXY_DOMAINS) def get_domain_group(login: str) -> str: """ 将邮箱地址映射到域名分组键,用于限流。 例如: "user@gmx.net" → "gmx", "user@aol.com" → "aol" """ login_lower = login.lower() for key in MAX_WORKERS_PER_DOMAIN: if key != "default" and key in login_lower: return key return "default" # 邮箱列表(简化为常量) REDIRECTION_MAILS = "appointment2022@aol.com, chenpeijun@aol.com,hongjiang176@aol.com,ciyuexie@aol.com,rutger.62@aol.com,ciccidaniel@aol.com,armasgoodman@aol.com,wknd.gemerine@aol.com,rafmail1981@aol.com,tonovichivanenaki@aol.com,hetland.ari@aol.com,mateusiversen@aol.com,lacerdaraffaello@aol.com,anasida76@aol.com,liamolinari@aol.com,sen70zib@aol.com,mezeiderrick@aol.com,stanisl49avchic@aol.com,damcvrobaneuron@aol.com,suyzanna_fleona@aol.com,dxealing.dissa@aol.com,hogg.karen@aol.com,obocharovamarina@aol.com,buchholzjohann@aol.com,orn.cecchini@aol.com,percivaltorgersen@aol.com,candalgudrun@aol.com,filimonis.76@aol.com,bengann_100@aol.com,axelhanne@aol.com,tiffanylarochelle@aol.com,nicoleta.r@aol.com,eichenbaum.1963@aol.com,kotensasharev@aol.com,samognat32@aol.com,edem_headshot@aol.com,kozmakuzmich1960@aol.com,damonsvensson@aol.com,anders.riva@aol.com,caiminwei123@gmail.com,yulingguo086@gmail.com,yingxiaolu086@gmail.com,lijiazhen0035@gmail.com,fangp370@gmail.com,huangyayu10086@gmail.com,fuziyuan110@gmail.com,xinyingdu886@gmail.com,yasiaforever.1971@aol.com,lukaszfidalgo@aol.com,zaichi29@aol.com,prostotakitak.1974@aol.com,mo90nroe@aol.com,blonde.87@aol.com,dimidrol.1969@aol.com" # 邮件处理相关函数 def is_valid_email(email: str) -> bool: """验证邮箱地址是否有效""" return re.fullmatch(EMAIL_ADDRESS_REGEX, email) is not None def extract_email_from_from_address(content: str) -> str: """从邮件地址中提取邮箱""" match = re.search(r'[\w.+-]+@[\w-]+\.[\w.-]+', content) return match.group(0) if match else "" def find_from_mail(param) -> str: """解析邮件地址""" from_address, encoded_algo = param[0] # 处理字节编码 if isinstance(from_address, bytes): from_address = from_address.decode(encoded_algo) # 如果邮箱地址无效,尝试另一种编码 if not is_valid_email(from_address) and len(param) == 2: from_address, new_encode = param[1] if new_encode is None: new_encode = encoded_algo if isinstance(from_address, bytes): from_address = from_address.decode(new_encode) # 清理邮箱地址 return from_address.strip(" ").strip(">").strip("<") class MailReader: """邮件读取器类""" def __init__(self, login: str, password: str, proxy: Optional[ProxyConfig] = None, failed_gmx_list: Optional[List[str]] = None, delay_range: tuple = (1.0, 5.0)): self.login = login self.password = password self.proxy = proxy self.failed_gmx_list = failed_gmx_list if failed_gmx_list is not None else [] self.delay_range = delay_range # (min_seconds, max_seconds) 随机延迟范围 def read_emails(self, mails_messages: List[MailPojo]) -> List[MailPojo]: """读取邮件(含随机延迟和读取时间记录)""" # 随机延迟,模拟人工节奏,降低被识别为机器人的概率 _delay = random.uniform(*self.delay_range) time.sleep(_delay) # ── GMX / inbox.lv 账户 → 使用代理连接(失败自动重试最多3次)── if is_proxy_account(self.login) and self.proxy is not None: result = self._read_emails_with_proxy_retry(mails_messages) else: result = self._read_emails_internal(create_imap(self.login), mails_messages) # 记录本次读取时间,供下次调用的 need_to_check_email 判断间隔 MONGO_STORE_MANAGER.update_mail_read_time(self.login) return result def _read_emails_with_proxy_retry( self, mails_messages: List[MailPojo], max_retries: int = 8, ) -> List[MailPojo]: """通过 ProxyIMAPClient 读取邮件(GMX / inbox.lv),失败时最多重试 max_retries 次。""" imap_server = get_imap_server(self.login) last_error: Optional[Exception] = None for attempt in range(1, max_retries + 1): try: print("[Proxy] {} → {} via {} (tentative {}/{})".format( self.login, imap_server, self.proxy, attempt, max_retries)) imap = ProxyIMAPClient( host=imap_server, proxy=self.proxy, use_uid=True, ssl=True, timeout=IMAP_SOCKET_TIMEOUT, ) return self._read_emails_internal(imap, mails_messages) except Exception as exc: last_error = exc print("[Proxy] Échec tentative {}/{} pour {} : {}".format( attempt, max_retries, self.login, exc)) print("[Proxy] Toutes les tentatives ont échoué pour {} : {}".format( self.login, last_error)) self.failed_gmx_list.append(self.login) return [] def _read_emails_internal(self, imap, mails_messages: List[MailPojo]) -> List[MailPojo]: """Logique commune de lecture des emails (IMAPClient ou imaplib).""" is_imap_client = isinstance(imap, IMAPClient) # 登录邮箱 if is_imap_client: dat = imap.login(self.login, str(self.password)) print("type is {} for {}".format(dat, self.login)) else: responseType, dat = imap.login(self.login, str(self.password)) print("type is {} for {}".format(responseType, self.login)) mail_list = [] print("read mails from {}".format(self.login)) # 获取文件夹列表(委托给 mail_constants.show_folders) folder_list = show_folders(imap) # 处理每个文件夹 for folder in folder_list: print("folder is {}".format(folder)) # 跳过Sent和Drafts文件夹 if folder in ["Sent", "Drafts"]: continue if is_imap_client: # 使用IMAPClient处理 mail_list.extend(self._get_messages_from_folder_for_imapclient(imap, folder)) else: # 使用传统IMAP处理 mail_list.extend(self._get_messages_from_folder(imap, subject=VALIDATION_URL_SUBJECT_FR, folder=folder)) mail_list.extend(self._get_messages_from_folder(imap, subject=VALIDATION_URL_SUBJECT_EN, folder=folder)) # 关闭连接 if not is_imap_client: imap.close() imap.logout() # 添加邮件到结果列表 mails_messages.extend(mail_list) return mail_list def _get_messages_from_folder(self, imap, subject: str, folder: str = "INBOX") -> List[MailPojo]: """从指定文件夹获取邮件(传统IMAP方式,批量fetch减少往返次数)""" imap.select(folder) mail_messages = [] # 搜索符合条件的所有邮件ID search_query = '(SUBJECT "{}" SINCE "{}")'.format(subject, datetime.datetime.today().strftime(DATE_FORMAT)) typ, data = imap.search(None, search_query) ids = data[0].split() if not ids: return mail_messages # 批量fetch:一次请求取回所有匹配邮件,减少 N 次往返为 1 次 id_list = b",".join(ids) try: res, msg_list = imap.fetch(id_list, "(RFC822)") except Exception as error: print("Batch fetch error in folder {}: {}".format(folder, error)) return mail_messages for response in msg_list: if not isinstance(response, tuple): continue try: email_message = email.message_from_bytes(response[1]) # 解码主题 subject_decoded, subject_encoded = decode_header(email_message["Subject"])[0] if isinstance(subject_decoded, bytes): subject_decoded = subject_decoded.decode(subject_encoded) # 解码发件人地址 from_address = find_from_mail(decode_header(email_message.get("From"))) # 解码收件人地址 to_email = find_from_mail(decode_header(email_message.get("To"))) print("Email:", self.login) print("From:", from_address) print("To:", to_email) print("Subject:", subject_decoded) # 获取邮件正文(委托给 imap_proxy_reader.extract_body) body = extract_body(email_message) # 检查是否是预约验证邮件 if VALIDATION_URL_SUBJECT_FR in subject_decoded or VALIDATION_URL_SUBJECT_EN in subject_decoded: mail = MailPojo( subject=subject_decoded, body=body, from_address=from_address ) # 设置收件人地址 if to_email is None: mail.to_address = self.login else: mail.to_address = to_email mail.mail_address = self.login mail_messages.append(mail) except Exception as error: print("Error processing email: {}".format(error)) return mail_messages def _get_messages_from_folder_for_imapclient(self, imap, folder: str = "INBOX") -> List[MailPojo]: """从指定文件夹获取邮件(IMAPClient方式)""" mail_messages = [] # 搜索邮件 search_terms = 'SINCE "{}"'.format( datetime.datetime.today().strftime(DATE_FORMAT)) print("{}: search terms is {}".format(self.login, search_terms)) imap.select_folder(folder) messages = imap.search(['SINCE', datetime.datetime.today()]) print("{}: {} messages from our best friend".format(self.login, len(messages))) if len(messages) == 0: return mail_messages # 处理每封邮件 for uid, message_data in imap.fetch(messages, 'RFC822').items(): try: email_message = email.message_from_bytes(message_data[b'RFC822']) # 获取发件人和主题 from_address = email_message.get('FROM') or "" subject = email_message.get('subject') or "" # 检查是否是Hermes邮件 hermes_mail_address = "no-reply@hermes.com" if (hermes_mail_address in from_address or "outlook.com" in from_address or "hotmail" in from_address): # 提取邮件正文(委托给 imap_proxy_reader.extract_body) body = extract_body(email_message) # 检查是否是预约验证邮件 if (VALIDATION_URL_SUBJECT_FR in subject or VALIDATION_URL_SUBJECT_EN in subject or "Votre=20demande=20de=20rendez-vous" in subject or "Votre demande de rendez-vous" in body): mail = MailPojo( subject=subject, body=body, from_address=from_address ) mail.isImapClient = True print("email is {}".format(self.login)) print("body is {}".format(body)) print("subject is {}".format(subject)) # 设置收件人地址 if len(mail.to_address) == 0: if "outlook.com" in from_address or "hotmail.com" in from_address: # 转发邮件 mail.to_address = extract_email_from_from_address(from_address) else: mail.to_address = self.login mail_messages.append(mail) except Exception as error: print("Error trying to read email_Message for {}: {}".format(self.login, error)) return mail_messages # 邮件处理相关函数 def find_item_by_url(url: str, successful_items) -> Union[None, ReserveResultPojo]: """根据URL查找预约结果对象""" print("url is :" + url) parts = url.split('/') _id = parts[5] if len(_id) == 6: for item in successful_items: if item.id == _id: return item return None def need_to_valid_url(url: str, item: Union[ReserveResultPojo, None]) -> bool: """判断是否需要验证URL""" print("url is :" + url) parts = url.split('/') _id = parts[5] if len(_id) == 6: if item: if item.url_validated is not None: return not item.url_validated else: # 如果url_validated为None,需要验证 return True return True print("id not valid:{}".format(_id)) return False def need_to_check_email(mail: str, successful_items) -> bool: """ 判断是否需要检查邮件。 两种情况跳过: 1. 该邮箱已有成功验证记录(原逻辑) 2. 距上次读取不足 MAIL_READ_MIN_INTERVAL_MINUTES 分钟(防频繁重复登录) """ print("successful_items size is " + str(len(successful_items))) # 原逻辑:已有成功验证则跳过 filtered_items = [item for item in successful_items if item.email == mail] validated_items = [item for item in filtered_items if item.url_validated is not None and item.url_validated is True] if len(validated_items) > 0: return False # 新逻辑:距上次读取时间太短则跳过 last_read = MONGO_STORE_MANAGER.get_last_mail_read_time(mail) if last_read is not None: elapsed_minutes = (datetime.datetime.utcnow() - last_read).total_seconds() / 60 if elapsed_minutes < MAIL_READ_MIN_INTERVAL_MINUTES: print("[跳过] {} 距上次读取仅 {:.1f} 分钟,未达到最小间隔 {} 分钟".format( mail, elapsed_minutes, MAIL_READ_MIN_INTERVAL_MINUTES)) return False return True def find_links_to_validate_from_mail_list( mail_list: List[MailAddress], logger, proxy: Optional[ProxyConfig] = None, proxy_pool: Optional[List[ProxyConfig]] = None, ) -> List[str]: """ 从邮件列表中查找需要验证的链接,返回读取失败的GMX账户列表。 参数 ---- proxy : 单一代理(GMX专用,兼容旧调用方式) proxy_pool : 代理列表(非GMX账号也会轮换使用;若为空则非GMX走直连) """ if not mail_list: return [] contact_to_book_list = MONGO_STORE_MANAGER.get_all_contact_to_book_list() successful_items = MONGO_STORE_MANAGER.get_all_successful_items_for_day() mails_messages = [] failed_gmx: List[str] = [] # ── 按域名分组,每组使用独立线程池限流 ──────────────────────────── # domain_group → [(MailAddress, ProxyConfig|None), ...] grouped: Dict[str, List[tuple]] = defaultdict(list) for idx, mail in enumerate(mail_list): if not need_to_check_email(mail.mail, successful_items): continue # 为账号分配代理 if is_proxy_account(mail.mail): # GMX / inbox.lv → 使用专用 GMX 代理 assigned_proxy = proxy elif proxy_pool: # 非GMX + 有代理池 → 按索引轮换分配 assigned_proxy = proxy_pool[idx % len(proxy_pool)] else: # 无代理池 → 直连 assigned_proxy = None group_key = get_domain_group(mail.mail) grouped[group_key].append((mail, assigned_proxy)) # ── 每个域名分组启动独立线程池 ──────────────────────────────────── # future → mail address,用于进度显示 future_to_mail: Dict[object, str] = {} executors = [] for group_key, items in grouped.items(): max_w = MAX_WORKERS_PER_DOMAIN.get(group_key, MAX_WORKERS_PER_DOMAIN["default"]) executor = ThreadPoolExecutor(max_workers=max_w) executors.append(executor) print("[限流] 域名组 '{}': {} 账号,max_workers={}".format( group_key, len(items), max_w)) for mail, assigned_proxy in items: mail_reader = MailReader( mail.mail, mail.password, proxy=assigned_proxy, failed_gmx_list=failed_gmx, ) future = executor.submit(mail_reader.read_emails, mails_messages) future_to_mail[future] = mail.mail # ── 等待所有任务完成,然后关闭线程池 ───────────────────────────── total = len(future_to_mail) completed = 0 for future in as_completed(future_to_mail): mail_addr = future_to_mail[future] completed += 1 try: future.result(timeout=FUTURE_TIMEOUT) print("[进度] {}/{} {}".format(completed, total, mail_addr)) except TimeoutError: print("[进度] {}/{} {} — Timeout ({} s), lecture ignorée.".format( completed, total, mail_addr, FUTURE_TIMEOUT)) except Exception as e: print("[进度] {}/{} {} — Erreur: {}".format(completed, total, mail_addr, e)) for executor in executors: executor.shutdown(wait=False) # ── 输出代理账号读取摘要 ────────────────────────────────────────── if failed_gmx: print("\n[Proxy] ⚠️ {} compte(s) non lus (GMX / inbox.lv) :".format(len(failed_gmx))) for addr in failed_gmx: print(" ✗ {}".format(addr)) else: print("\n[Proxy] ✅ Tous les comptes GMX / inbox.lv ont été lus avec succès.") # ── 处理邮件中的验证链接 ────────────────────────────────────────── _refreshed_successful_items = MONGO_STORE_MANAGER.get_all_successful_items_for_day() for mail in mails_messages: match = re.search(VALIDATION_URL_REGEX, mail.body) if match: url = match.group(0) _item = find_item_by_url(url, _refreshed_successful_items) if need_to_valid_url(url, _item): logger.info("need to validate url: " + url) _model = "" _used_ip = "" if _item: _model = _item.model _used_ip = _item.current_ip MONGO_STORE_MANAGER.save_links_to_validate( url, mail.to_address, model=_model, _all_contact_list=contact_to_book_list, _used_ip=_used_ip) else: logger.info("do not need to click url --> {}".format(mail.mail_address)) return failed_gmx # 主函数 if __name__ == '__main__': # 读取联系人列表 contact_to_book_list = read_contacts( # file_name="~/Desktop/contact_list_inbox_lv_100.xlsx") # file_name="~/Desktop/contact_list_2026-04-21_200_yahoo.xlsx") # file_name="~/Desktop/contact_list_yahoo_100_20_04.xlsx") # file_name="~/Desktop/contact_yahoo_5.xlsx") # file_name="~/Desktop/contact_list_2026-04-24_yahoo_50.xlsx") file_name="~/Desktop/contact_list_2026-04-23.xlsx") # file_name="~/Desktop/contact_list_2026-04-11.xlsx") # file_name="~/Desktop/contact_list_2026-04-17.xlsx") # file_name="~/Desktop/contact_list_inbox_100_14_04.xlsx") # file_name="~/Desktop/contact_list_2024-09-02_firemail_de_100.xlsx") # file_name="~/Desktop/reste_inbox_lv.xlsx") # file_name="~/Desktop/yahooo_list.xlsx") # 获取目标邮箱列表 all_mail_list = MONGO_STORE_MANAGER.get_destination_emails() # 筛选需要检查的邮件列表 mail_list_to_check = [] for contact in contact_to_book_list: for mail in all_mail_list: if contact.mail == mail.mail: mail_list_to_check.append(mail) # 设置日志记录器 logger = logging.getLogger() # 获取已验证的链接列表 _all_links = MONGO_STORE_MANAGER.get_links_to_validate() # 过滤掉已处理的邮件 filter_mail = [] for mail_pojo in mail_list_to_check: _to_add = True for _link in _all_links: if _link.email == mail_pojo.mail: _to_add = False if _to_add: filter_mail.append(mail_pojo) # filter_mail = [MailAddress("pishikmamn@gmx.de", "53OBns2jAXE")] # ── Mode de lecture : GMX_ONLY=true → uniquement les comptes GMX ── gmx_only = os.environ.get("GMX_ONLY", "false").strip().lower() == "true" if gmx_only: filter_mail = [m for m in filter_mail if is_gmx_account(m.mail)] print("[Mode] Lecture GMX uniquement ({} comptes)".format(len(filter_mail))) else: print("[Mode] Lecture de tous les comptes ({} comptes)".format(len(filter_mail))) # 配置代理(GMX账号必须通过代理读取) gmx_proxy = ProxyConfig( host=os.environ.get("GMX_PROXY_HOST", ""), port=int(os.environ.get("GMX_PROXY_PORT", "443")), proxy_type=os.environ.get("GMX_PROXY_TYPE", "SOCKS5"), username=os.environ.get("GMX_PROXY_USERNAME"), password=os.environ.get("GMX_PROXY_PASSWORD"), ) # 非GMX账号代理池(可配置多个,轮换使用;留空则直连) # 格式:PROXY_POOL_HOSTS="host1:port1,host2:port2",与 GMX_PROXY 同类型 _proxy_pool_raw = os.environ.get("PROXY_POOL_HOSTS", "").strip() non_gmx_proxy_pool: Optional[List[ProxyConfig]] = None if _proxy_pool_raw: non_gmx_proxy_pool = [] for entry in _proxy_pool_raw.split(","): entry = entry.strip() if ":" in entry: _h, _p = entry.rsplit(":", 1) non_gmx_proxy_pool.append(ProxyConfig( host=_h, port=int(_p), proxy_type=os.environ.get("GMX_PROXY_TYPE", "SOCKS5"), username=os.environ.get("GMX_PROXY_USERNAME"), password=os.environ.get("GMX_PROXY_PASSWORD"), )) # 处理邮件 failed = find_links_to_validate_from_mail_list( filter_mail, logger, proxy=gmx_proxy, proxy_pool=non_gmx_proxy_pool ) # ── Afficher les comptes GMX non lus ───────────────────── if failed: print("\n===== Comptes GMX non lus ({}) =====".format(len(failed))) for addr in failed: print(" ✗ {}".format(addr)) else: print("\n===== Tous les comptes GMX ont été lus avec succès =====")