import datetime import email import logging import os import re from concurrent.futures import ThreadPoolExecutor from email.header import decode_header from email.message import Message from typing import Union, List, Optional from dotenv import load_dotenv from imapclient import IMAPClient from db.mongo_manager import MONGO_STORE_MANAGER from excel_reader import read_contacts from mail.mail_constants import DOMAIN_HOTMAIL, create_imap from mail.imap_proxy_reader import ProxyIMAPClient, ProxyConfig, get_imap_server from models.ReserveResultPojo import ReserveResultPojo from models.mail_pojo import MailPojo, MailAddress # Charger les variables d'environnement depuis .env load_dotenv() # 定义常量 VALIDATION_URL_SUBJECT_FR = 'Validation de votre demande de rendez-vous' VALIDATION_URL_SUBJECT_EN = 'Please confirm your appointment request' VALIDATION_URL_REGEX = r"https:\/\/rendezvousparis.hermes.com\/client\/register\/[A-Z0-9]+\/validate.code=[A-Z0-9]+" PART_VALIDATION_URL_REGEX = r"client\/register\/[A-Z0-9]+\/validate.code=[A-Z0-9]+" HERMES_EMAIL = "no-reply@hermes.com" EMAIL_ADDRESS_REGEX = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b' # 日期格式 DATE_FORMAT = "%d-%b-%Y" # GMX域名列表(用于判断是否需要使用代理) GMX_DOMAINS = ( "gmx.com", "gmx.net", "gmx.de", "gmx.at", "gmx.fr", "gmx.us", "gmx.sg", "gmx.ch", "gmx.pt", ) def is_gmx_account(login: str) -> bool: """判断邮箱是否属于GMX域名""" return any(d in login.lower() for d in GMX_DOMAINS) # 邮箱列表(简化为常量) REDIRECTION_MAILS = "appointment2022@aol.com, chenpeijun@aol.com,hongjiang176@aol.com,ciyuexie@aol.com,rutger.62@aol.com,ciccidaniel@aol.com,armasgoodman@aol.com,wknd.gemerine@aol.com,rafmail1981@aol.com,tonovichivanenaki@aol.com,hetland.ari@aol.com,mateusiversen@aol.com,lacerdaraffaello@aol.com,anasida76@aol.com,liamolinari@aol.com,sen70zib@aol.com,mezeiderrick@aol.com,stanisl49avchic@aol.com,damcvrobaneuron@aol.com,suyzanna_fleona@aol.com,dxealing.dissa@aol.com,hogg.karen@aol.com,obocharovamarina@aol.com,buchholzjohann@aol.com,orn.cecchini@aol.com,percivaltorgersen@aol.com,candalgudrun@aol.com,filimonis.76@aol.com,bengann_100@aol.com,axelhanne@aol.com,tiffanylarochelle@aol.com,nicoleta.r@aol.com,eichenbaum.1963@aol.com,kotensasharev@aol.com,samognat32@aol.com,edem_headshot@aol.com,kozmakuzmich1960@aol.com,damonsvensson@aol.com,anders.riva@aol.com,caiminwei123@gmail.com,yulingguo086@gmail.com,yingxiaolu086@gmail.com,lijiazhen0035@gmail.com,fangp370@gmail.com,huangyayu10086@gmail.com,fuziyuan110@gmail.com,xinyingdu886@gmail.com,yasiaforever.1971@aol.com,lukaszfidalgo@aol.com,zaichi29@aol.com,prostotakitak.1974@aol.com,mo90nroe@aol.com,blonde.87@aol.com,dimidrol.1969@aol.com" # 邮件处理相关函数 def is_valid_email(email: str) -> bool: """验证邮箱地址是否有效""" return re.fullmatch(EMAIL_ADDRESS_REGEX, email) is not None def extract_email_from_from_address(content: str) -> str: """从邮件地址中提取邮箱""" match = re.search(r'[\w.+-]+@[\w-]+\.[\w.-]+', content) return match.group(0) if match else "" def find_from_mail(param) -> str: """解析邮件地址""" from_address, encoded_algo = param[0] # 处理字节编码 if isinstance(from_address, bytes): from_address = from_address.decode(encoded_algo) # 如果邮箱地址无效,尝试另一种编码 if not is_valid_email(from_address) and len(param) == 2: from_address, new_encode = param[1] if new_encode is None: new_encode = encoded_algo if isinstance(from_address, bytes): from_address = from_address.decode(new_encode) # 清理邮箱地址 return from_address.strip(" ").strip(">").strip("<") class MailReader: """邮件读取器类""" def __init__(self, login: str, password: str, proxy: Optional[ProxyConfig] = None, failed_gmx_list: Optional[List[str]] = None): self.login = login self.password = password self.proxy = proxy self.failed_gmx_list = failed_gmx_list if failed_gmx_list is not None else [] @staticmethod def show_folders(imap) -> List[str]: """获取邮箱文件夹列表""" folders = [] is_imap_client = isinstance(imap, IMAPClient) if not is_imap_client: # 处理非IMAPClient对象 for i in imap.list()[1]: l = i.decode().split(' "/" ') folders.append(l[1]) else: # 处理IMAPClient对象 folder_list = imap.list_folders() for i in folder_list: name = i[-1] folders.append(name) return folders def read_emails(self, mails_messages: List[MailPojo]) -> List[MailPojo]: """读取邮件""" # ── GMX账户 → 使用代理连接(失败自动重试最多3次)── if is_gmx_account(self.login) and self.proxy is not None: return self._read_emails_with_proxy_retry(mails_messages) else: return self._read_emails_internal(create_imap(self.login), mails_messages) def _read_emails_with_proxy_retry( self, mails_messages: List[MailPojo], max_retries: int = 8, ) -> List[MailPojo]: """通过 ProxyIMAPClient 读取 GMX 邮件,失败时最多重试 max_retries 次。""" imap_server = get_imap_server(self.login) last_error: Optional[Exception] = None for attempt in range(1, max_retries + 1): try: print("[GMX-Proxy] {} → {} via {} (tentative {}/{})".format( self.login, imap_server, self.proxy, attempt, max_retries)) imap = ProxyIMAPClient( host=imap_server, proxy=self.proxy, use_uid=True, ssl=True, ) return self._read_emails_internal(imap, mails_messages) except Exception as exc: last_error = exc print("[GMX-Proxy] Échec tentative {}/{} pour {} : {}".format( attempt, max_retries, self.login, exc)) print("[GMX-Proxy] Toutes les tentatives ont échoué pour {} : {}".format( self.login, last_error)) self.failed_gmx_list.append(self.login) return [] def _read_emails_internal(self, imap, mails_messages: List[MailPojo]) -> List[MailPojo]: """Logique commune de lecture des emails (IMAPClient ou imaplib).""" is_imap_client = isinstance(imap, IMAPClient) # 登录邮箱 if is_imap_client: dat = imap.login(self.login, str(self.password)) print("type is {} for {}".format(dat, self.login)) else: responseType, dat = imap.login(self.login, str(self.password)) print("type is {} for {}".format(responseType, self.login)) mail_list = [] print("read mails from {}".format(self.login)) # 获取文件夹列表 folder_list = self.show_folders(imap) # 处理每个文件夹 for folder in folder_list: print("folder is {}".format(folder)) # 跳过Sent和Drafts文件夹 if folder in ["Sent", "Drafts"]: continue if is_imap_client: # 使用IMAPClient处理 mail_list.extend(self._get_messages_from_folder_for_imapclient(imap, folder)) else: # 使用传统IMAP处理 mail_list.extend(self._get_messages_from_folder(imap, subject=VALIDATION_URL_SUBJECT_FR, folder=folder)) mail_list.extend(self._get_messages_from_folder(imap, subject=VALIDATION_URL_SUBJECT_EN, folder=folder)) # 关闭连接 if not is_imap_client: imap.close() imap.logout() # 添加邮件到结果列表 mails_messages.extend(mail_list) return mail_list def _get_messages_from_folder(self, imap, subject: str, folder: str = "INBOX") -> List[MailPojo]: """从指定文件夹获取邮件(传统IMAP方式)""" imap.select(folder) mail_messages = [] # 搜索邮件 search_query = '(SUBJECT "{}" SINCE "{}")'.format(subject, datetime.datetime.today().strftime(DATE_FORMAT)) typ, data = imap.search(None, search_query) for i in data[0].split(): try: # 获取邮件内容 res, msg = imap.fetch(i.decode("utf-8"), "(RFC822)") # 解析邮件 for response in msg: if isinstance(response, tuple): email_message = email.message_from_bytes(response[1]) # 解码主题 subject, subject_encoded = decode_header(email_message["Subject"])[0] if isinstance(subject, bytes): subject = subject.decode(subject_encoded) # 解码发件人地址 from_address = find_from_mail(decode_header(email_message.get("From"))) # 解码收件人地址 to_email = find_from_mail(decode_header(email_message.get("To"))) print("Email:", self.login) print("From:", from_address) print("To:", to_email) print("Subject:", subject) # 获取邮件正文 body = self._extract_body(email_message) # 检查是否是预约验证邮件 if VALIDATION_URL_SUBJECT_FR in subject or VALIDATION_URL_SUBJECT_EN in subject: mail = MailPojo( subject=subject, body=body, from_address=from_address ) # 设置收件人地址 if to_email is None: mail.to_address = self.login else: mail.to_address = to_email mail.mail_address = self.login mail_messages.append(mail) except Exception as error: print("Error processing email: {}".format(error)) return mail_messages def _extract_body(self, email_message: Message) -> str: """提取邮件正文""" body = "" # 遍历邮件部分 for part in email_message.walk(): try: content_type = part.get_content_type() if content_type == "text/html": # 处理HTML内容 payload = part.get_payload(decode=True) if payload: body += payload.decode("utf-8", errors="ignore") elif content_type == "text/plain": # 处理纯文本内容 payload = part.get_payload() if payload: body += payload except Exception as error: print("Error extracting body part: {}".format(error)) return body def _get_messages_from_folder_for_imapclient(self, imap, folder: str = "INBOX") -> List[MailPojo]: """从指定文件夹获取邮件(IMAPClient方式)""" mail_messages = [] # 搜索邮件 search_terms = 'SINCE "{}"'.format( datetime.datetime.today().strftime(DATE_FORMAT)) print("{}: search terms is {}".format(self.login, search_terms)) imap.select_folder(folder) messages = imap.search(['SINCE', datetime.datetime.today()]) print("{}: {} messages from our best friend".format(self.login, len(messages))) if len(messages) == 0: return mail_messages # 处理每封邮件 for uid, message_data in imap.fetch(messages, 'RFC822').items(): try: email_message = email.message_from_bytes(message_data[b'RFC822']) # 获取发件人和主题 from_address = email_message.get('FROM') subject = email_message.get('subject') # 检查是否是Hermes邮件 hermes_mail_address = "no-reply@hermes.com" if (hermes_mail_address in from_address or "outlook.com" in from_address or "hotmail" in from_address): # 提取邮件正文 body = self._extract_body_for_imapclient(email_message) # 检查是否是预约验证邮件 if (VALIDATION_URL_SUBJECT_FR in subject or VALIDATION_URL_SUBJECT_EN in subject or "Votre=20demande=20de=20rendez-vous" in subject or "Votre demande de rendez-vous" in body): mail = MailPojo( subject=subject, body=body, from_address=from_address ) mail.isImapClient = True print("email is {}".format(self.login)) print("body is {}".format(body)) print("subject is {}".format(subject)) # 设置收件人地址 if len(mail.to_address) == 0: if "outlook.com" in from_address or "hotmail.com" in from_address: # 转发邮件 mail.to_address = extract_email_from_from_address(from_address) else: mail.to_address = self.login mail_messages.append(mail) except Exception as error: print("Error trying to read email_Message for {}: {}".format(self.login, error)) return mail_messages def _extract_body_for_imapclient(self, email_message: Message) -> str: """提取IMAPClient邮件正文""" body = "" for part in email_message.walk(): content_type = part.get_content_type() if content_type == "text/html": payload = part.get_payload(decode=True) if payload: body += payload.decode("utf-8", errors="ignore") elif content_type == "text/plain": payload = part.get_payload() if payload: body += payload return body # 邮件处理相关函数 def find_item_by_url(url: str, successful_items) -> Union[None, ReserveResultPojo]: """根据URL查找预约结果对象""" print("url is :" + url) parts = url.split('/') _id = parts[5] if len(_id) == 6: for item in successful_items: if item.id == _id: return item return None def need_to_valid_url(url: str, item: Union[ReserveResultPojo, None]) -> bool: """判断是否需要验证URL""" print("url is :" + url) parts = url.split('/') _id = parts[5] if len(_id) == 6: if item: if item.url_validated is not None: return not item.url_validated else: # 如果url_validated为None,需要验证 return True return True print("id not valid:{}".format(_id)) return False def need_to_check_email(mail: str, successful_items) -> bool: """判断是否需要检查邮件""" print("successful_items size is " + str(len(successful_items))) # 特殊处理 if mail == "saigecong1990@pissmail.com": return True # 过滤已验证的项目 filtered_items = [item for item in successful_items if item.email == mail] # 检查是否有已验证的项目 validated_items = [item for item in filtered_items if item.url_validated is not None and item.url_validated is True] return len(validated_items) == 0 def find_links_to_validate_from_mail_list( mail_list: List[MailAddress], logger, proxy: Optional[ProxyConfig] = None, ) -> List[str]: """从邮件列表中查找需要验证的链接,返回读取失败的GMX账户列表""" if not mail_list: return [] # 检查时间前开始检查邮件 contact_to_book_list = MONGO_STORE_MANAGER.get_all_contact_to_book_list() successful_items = MONGO_STORE_MANAGER.get_all_successful_items_for_day() mails_messages = [] failed_gmx: List[str] = [] # 使用线程池处理邮件 with ThreadPoolExecutor(max_workers=100) as executor: futures = [] for mail in mail_list: # 检查是否需要读取邮件 if need_to_check_email(mail.mail, successful_items): mail_reader = MailReader(mail.mail, mail.password, proxy=proxy, failed_gmx_list=failed_gmx) future = executor.submit(mail_reader.read_emails, mails_messages) futures.append(future) # 等待所有任务完成 for future in futures: try: future.result() except Exception as e: print("Error processing mail: {}".format(e)) # ── Résumé des comptes GMX en échec ────────────────────── if failed_gmx: print("\n[GMX-Proxy] ⚠️ {} compte(s) GMX non lus :".format(len(failed_gmx))) for addr in failed_gmx: print(" ✗ {}".format(addr)) else: print("\n[GMX-Proxy] ✅ Tous les comptes GMX ont été lus avec succès.") # 刷新成功的项目 _refreshed_successful_items = MONGO_STORE_MANAGER.get_all_successful_items_for_day() # 处理邮件中的链接 for mail in mails_messages: match = re.search(VALIDATION_URL_REGEX, mail.body) if match: url = match.group(0) _item = find_item_by_url(url, _refreshed_successful_items) if need_to_valid_url(url, _item): logger.info("need to validate url: " + url) _model = "" _used_ip = "" if _item: _model = _item.model _used_ip = _item.current_ip MONGO_STORE_MANAGER.save_links_to_validate( url, mail.to_address, model=_model, _all_contact_list=contact_to_book_list, _used_ip= _used_ip) else: logger.info("do not need to click url --> {}".format(mail.mail_address)) return failed_gmx # 主函数 if __name__ == '__main__': # 读取联系人列表 contact_to_book_list = read_contacts( file_name="~/Desktop/contact_list_2026-03-05.xlsx") # file_name="~/Desktop/contact_list_2025-11-28.xlsx") # file_name="~/Desktop/contact_list_2025-11-06.xlsx") # 获取目标邮箱列表 all_mail_list = MONGO_STORE_MANAGER.get_destination_emails() # 筛选需要检查的邮件列表 mail_list_to_check = [] for contact in contact_to_book_list: for mail in all_mail_list: if contact.mail == mail.mail: mail_list_to_check.append(mail) # 设置日志记录器 logger = logging.getLogger() # 获取已验证的链接列表 _all_links = MONGO_STORE_MANAGER.get_links_to_validate() # 过滤掉已处理的邮件 filter_mail = [] for mail_pojo in mail_list_to_check: _to_add = True for _link in _all_links: if _link.email == mail_pojo.mail: _to_add = False if _to_add: filter_mail.append(mail_pojo) # filter_mail = [MailAddress("birgitnaya@gmx.net", "XEeUF3Y1yaO")] # ── Mode de lecture : GMX_ONLY=true → uniquement les comptes GMX ── gmx_only = os.environ.get("GMX_ONLY", "false").strip().lower() == "true" if gmx_only: filter_mail = [m for m in filter_mail if is_gmx_account(m.mail)] print("[Mode] Lecture GMX uniquement ({} comptes)".format(len(filter_mail))) else: print("[Mode] Lecture de tous les comptes ({} comptes)".format(len(filter_mail))) # 配置代理(GMX账号必须通过代理读取) gmx_proxy = ProxyConfig( host=os.environ.get("GMX_PROXY_HOST", ""), port=int(os.environ.get("GMX_PROXY_PORT", "443")), proxy_type=os.environ.get("GMX_PROXY_TYPE", "SOCKS5"), username=os.environ.get("GMX_PROXY_USERNAME"), password=os.environ.get("GMX_PROXY_PASSWORD"), ) # 处理邮件 failed = find_links_to_validate_from_mail_list(filter_mail, logger, proxy=gmx_proxy) # ── Afficher les comptes GMX non lus ───────────────────── if failed: print("\n===== Comptes GMX non lus ({}) =====".format(len(failed))) for addr in failed: print(" ✗ {}".format(addr)) else: print("\n===== Tous les comptes GMX ont été lus avec succès =====")