Files
appointment_request/mail/mail_reader_all_contacts.py
T
2026-04-09 08:02:23 +02:00

557 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import datetime
import email
import logging
import os
import re
from concurrent.futures import ThreadPoolExecutor
from email.header import decode_header
from email.message import Message
from typing import Union, List, Optional
from dotenv import load_dotenv
from imapclient import IMAPClient
from db.mongo_manager import MONGO_STORE_MANAGER
from excel_reader import read_contacts
from mail.mail_constants import DOMAIN_HOTMAIL, create_imap
from mail.imap_proxy_reader import ProxyIMAPClient, ProxyConfig, get_imap_server
from models.ReserveResultPojo import ReserveResultPojo
from models.mail_pojo import MailPojo, MailAddress
# Charger les variables d'environnement depuis .env
load_dotenv()
# 定义常量
VALIDATION_URL_SUBJECT_FR = 'Validation de votre demande de rendez-vous'
VALIDATION_URL_SUBJECT_EN = 'Please confirm your appointment request'
VALIDATION_URL_REGEX = r"https:\/\/rendezvousparis.hermes.com\/client\/register\/[A-Z0-9]+\/validate.code=[A-Z0-9]+"
PART_VALIDATION_URL_REGEX = r"client\/register\/[A-Z0-9]+\/validate.code=[A-Z0-9]+"
HERMES_EMAIL = "no-reply@hermes.com"
EMAIL_ADDRESS_REGEX = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b'
# 日期格式
DATE_FORMAT = "%d-%b-%Y"
# Timeouts GMX (en secondes)
IMAP_SOCKET_TIMEOUT = 300 # timeout socket pour chaque opération IMAP
FUTURE_TIMEOUT = 600 # durée max allouée à la lecture d'une boîte mail
# GMX域名列表(用于判断是否需要使用代理)
GMX_DOMAINS = (
"gmx.com", "gmx.net", "gmx.de", "gmx.at",
"gmx.fr", "gmx.us", "gmx.sg", "gmx.ch", "gmx.pt",
)
# 需要通过代理读取的域名列表
PROXY_DOMAINS = GMX_DOMAINS + ("inbox.lv",)
def is_gmx_account(login: str) -> bool:
"""判断邮箱是否属于GMX域名"""
return any(d in login.lower() for d in GMX_DOMAINS)
def is_proxy_account(login: str) -> bool:
"""判断邮箱是否需要通过代理读取(GMX 或 inbox.lv"""
return any(d in login.lower() for d in PROXY_DOMAINS)
# 邮箱列表(简化为常量)
REDIRECTION_MAILS = "appointment2022@aol.com, chenpeijun@aol.com,hongjiang176@aol.com,ciyuexie@aol.com,rutger.62@aol.com,ciccidaniel@aol.com,armasgoodman@aol.com,wknd.gemerine@aol.com,rafmail1981@aol.com,tonovichivanenaki@aol.com,hetland.ari@aol.com,mateusiversen@aol.com,lacerdaraffaello@aol.com,anasida76@aol.com,liamolinari@aol.com,sen70zib@aol.com,mezeiderrick@aol.com,stanisl49avchic@aol.com,damcvrobaneuron@aol.com,suyzanna_fleona@aol.com,dxealing.dissa@aol.com,hogg.karen@aol.com,obocharovamarina@aol.com,buchholzjohann@aol.com,orn.cecchini@aol.com,percivaltorgersen@aol.com,candalgudrun@aol.com,filimonis.76@aol.com,bengann_100@aol.com,axelhanne@aol.com,tiffanylarochelle@aol.com,nicoleta.r@aol.com,eichenbaum.1963@aol.com,kotensasharev@aol.com,samognat32@aol.com,edem_headshot@aol.com,kozmakuzmich1960@aol.com,damonsvensson@aol.com,anders.riva@aol.com,caiminwei123@gmail.com,yulingguo086@gmail.com,yingxiaolu086@gmail.com,lijiazhen0035@gmail.com,fangp370@gmail.com,huangyayu10086@gmail.com,fuziyuan110@gmail.com,xinyingdu886@gmail.com,yasiaforever.1971@aol.com,lukaszfidalgo@aol.com,zaichi29@aol.com,prostotakitak.1974@aol.com,mo90nroe@aol.com,blonde.87@aol.com,dimidrol.1969@aol.com"
# 邮件处理相关函数
def is_valid_email(email: str) -> bool:
"""验证邮箱地址是否有效"""
return re.fullmatch(EMAIL_ADDRESS_REGEX, email) is not None
def extract_email_from_from_address(content: str) -> str:
"""从邮件地址中提取邮箱"""
match = re.search(r'[\w.+-]+@[\w-]+\.[\w.-]+', content)
return match.group(0) if match else ""
def find_from_mail(param) -> str:
"""解析邮件地址"""
from_address, encoded_algo = param[0]
# 处理字节编码
if isinstance(from_address, bytes):
from_address = from_address.decode(encoded_algo)
# 如果邮箱地址无效,尝试另一种编码
if not is_valid_email(from_address) and len(param) == 2:
from_address, new_encode = param[1]
if new_encode is None:
new_encode = encoded_algo
if isinstance(from_address, bytes):
from_address = from_address.decode(new_encode)
# 清理邮箱地址
return from_address.strip(" ").strip(">").strip("<")
class MailReader:
"""邮件读取器类"""
def __init__(self, login: str, password: str, proxy: Optional[ProxyConfig] = None,
failed_gmx_list: Optional[List[str]] = None):
self.login = login
self.password = password
self.proxy = proxy
self.failed_gmx_list = failed_gmx_list if failed_gmx_list is not None else []
@staticmethod
def show_folders(imap) -> List[str]:
"""获取邮箱文件夹列表"""
folders = []
is_imap_client = isinstance(imap, IMAPClient)
if not is_imap_client:
# 处理非IMAPClient对象
for i in imap.list()[1]:
l = i.decode().split(' "/" ')
folders.append(l[1])
else:
# 处理IMAPClient对象
folder_list = imap.list_folders()
for i in folder_list:
name = i[-1]
folders.append(name)
return folders
def read_emails(self, mails_messages: List[MailPojo]) -> List[MailPojo]:
"""读取邮件"""
# ── GMX / inbox.lv 账户 → 使用代理连接(失败自动重试最多3次)──
if is_proxy_account(self.login) and self.proxy is not None:
return self._read_emails_with_proxy_retry(mails_messages)
else:
return self._read_emails_internal(create_imap(self.login), mails_messages)
def _read_emails_with_proxy_retry(
self,
mails_messages: List[MailPojo],
max_retries: int = 8,
) -> List[MailPojo]:
"""通过 ProxyIMAPClient 读取邮件(GMX / inbox.lv),失败时最多重试 max_retries 次。"""
imap_server = get_imap_server(self.login)
last_error: Optional[Exception] = None
for attempt in range(1, max_retries + 1):
try:
print("[Proxy] {}{} via {} (tentative {}/{})".format(
self.login, imap_server, self.proxy, attempt, max_retries))
imap = ProxyIMAPClient(
host=imap_server,
proxy=self.proxy,
use_uid=True,
ssl=True,
timeout=IMAP_SOCKET_TIMEOUT,
)
return self._read_emails_internal(imap, mails_messages)
except Exception as exc:
last_error = exc
print("[Proxy] Échec tentative {}/{} pour {} : {}".format(
attempt, max_retries, self.login, exc))
print("[Proxy] Toutes les tentatives ont échoué pour {} : {}".format(
self.login, last_error))
self.failed_gmx_list.append(self.login)
return []
def _read_emails_internal(self, imap, mails_messages: List[MailPojo]) -> List[MailPojo]:
"""Logique commune de lecture des emails (IMAPClient ou imaplib)."""
is_imap_client = isinstance(imap, IMAPClient)
# 登录邮箱
if is_imap_client:
dat = imap.login(self.login, str(self.password))
print("type is {} for {}".format(dat, self.login))
else:
responseType, dat = imap.login(self.login, str(self.password))
print("type is {} for {}".format(responseType, self.login))
mail_list = []
print("read mails from {}".format(self.login))
# 获取文件夹列表
folder_list = self.show_folders(imap)
# 处理每个文件夹
for folder in folder_list:
print("folder is {}".format(folder))
# 跳过Sent和Drafts文件夹
if folder in ["Sent", "Drafts"]:
continue
if is_imap_client:
# 使用IMAPClient处理
mail_list.extend(self._get_messages_from_folder_for_imapclient(imap, folder))
else:
# 使用传统IMAP处理
mail_list.extend(self._get_messages_from_folder(imap, subject=VALIDATION_URL_SUBJECT_FR, folder=folder))
mail_list.extend(self._get_messages_from_folder(imap, subject=VALIDATION_URL_SUBJECT_EN, folder=folder))
# 关闭连接
if not is_imap_client:
imap.close()
imap.logout()
# 添加邮件到结果列表
mails_messages.extend(mail_list)
return mail_list
def _get_messages_from_folder(self, imap, subject: str, folder: str = "INBOX") -> List[MailPojo]:
"""从指定文件夹获取邮件(传统IMAP方式)"""
imap.select(folder)
mail_messages = []
# 搜索邮件
search_query = '(SUBJECT "{}" SINCE "{}")'.format(subject, datetime.datetime.today().strftime(DATE_FORMAT))
typ, data = imap.search(None, search_query)
for i in data[0].split():
try:
# 获取邮件内容
res, msg = imap.fetch(i.decode("utf-8"), "(RFC822)")
# 解析邮件
for response in msg:
if isinstance(response, tuple):
email_message = email.message_from_bytes(response[1])
# 解码主题
subject, subject_encoded = decode_header(email_message["Subject"])[0]
if isinstance(subject, bytes):
subject = subject.decode(subject_encoded)
# 解码发件人地址
from_address = find_from_mail(decode_header(email_message.get("From")))
# 解码收件人地址
to_email = find_from_mail(decode_header(email_message.get("To")))
print("Email:", self.login)
print("From:", from_address)
print("To:", to_email)
print("Subject:", subject)
# 获取邮件正文
body = self._extract_body(email_message)
# 检查是否是预约验证邮件
if VALIDATION_URL_SUBJECT_FR in subject or VALIDATION_URL_SUBJECT_EN in subject:
mail = MailPojo(
subject=subject,
body=body,
from_address=from_address
)
# 设置收件人地址
if to_email is None:
mail.to_address = self.login
else:
mail.to_address = to_email
mail.mail_address = self.login
mail_messages.append(mail)
except Exception as error:
print("Error processing email: {}".format(error))
return mail_messages
def _extract_body(self, email_message: Message) -> str:
"""提取邮件正文"""
body = ""
# 遍历邮件部分
for part in email_message.walk():
try:
content_type = part.get_content_type()
if content_type == "text/html":
# 处理HTML内容
payload = part.get_payload(decode=True)
if payload:
body += payload.decode("utf-8", errors="ignore")
elif content_type == "text/plain":
# 处理纯文本内容
payload = part.get_payload()
if payload:
body += payload
except Exception as error:
print("Error extracting body part: {}".format(error))
return body
def _get_messages_from_folder_for_imapclient(self, imap, folder: str = "INBOX") -> List[MailPojo]:
"""从指定文件夹获取邮件(IMAPClient方式)"""
mail_messages = []
# 搜索邮件
search_terms = 'SINCE "{}"'.format(
datetime.datetime.today().strftime(DATE_FORMAT))
print("{}: search terms is {}".format(self.login, search_terms))
imap.select_folder(folder)
messages = imap.search(['SINCE', datetime.datetime.today()])
print("{}: {} messages from our best friend".format(self.login, len(messages)))
if len(messages) == 0:
return mail_messages
# 处理每封邮件
for uid, message_data in imap.fetch(messages, 'RFC822').items():
try:
email_message = email.message_from_bytes(message_data[b'RFC822'])
# 获取发件人和主题
from_address = email_message.get('FROM')
subject = email_message.get('subject')
# 检查是否是Hermes邮件
hermes_mail_address = "no-reply@hermes.com"
if (hermes_mail_address in from_address or
"outlook.com" in from_address or
"hotmail" in from_address):
# 提取邮件正文
body = self._extract_body_for_imapclient(email_message)
# 检查是否是预约验证邮件
if (VALIDATION_URL_SUBJECT_FR in subject or
VALIDATION_URL_SUBJECT_EN in subject or
"Votre=20demande=20de=20rendez-vous" in subject or
"Votre demande de rendez-vous" in body):
mail = MailPojo(
subject=subject,
body=body,
from_address=from_address
)
mail.isImapClient = True
print("email is {}".format(self.login))
print("body is {}".format(body))
print("subject is {}".format(subject))
# 设置收件人地址
if len(mail.to_address) == 0:
if "outlook.com" in from_address or "hotmail.com" in from_address:
# 转发邮件
mail.to_address = extract_email_from_from_address(from_address)
else:
mail.to_address = self.login
mail_messages.append(mail)
except Exception as error:
print("Error trying to read email_Message for {}: {}".format(self.login, error))
return mail_messages
def _extract_body_for_imapclient(self, email_message: Message) -> str:
"""提取IMAPClient邮件正文"""
body = ""
for part in email_message.walk():
content_type = part.get_content_type()
if content_type == "text/html":
payload = part.get_payload(decode=True)
if payload:
body += payload.decode("utf-8", errors="ignore")
elif content_type == "text/plain":
payload = part.get_payload()
if payload:
body += payload
return body
# 邮件处理相关函数
def find_item_by_url(url: str, successful_items) -> Union[None, ReserveResultPojo]:
"""根据URL查找预约结果对象"""
print("url is :" + url)
parts = url.split('/')
_id = parts[5]
if len(_id) == 6:
for item in successful_items:
if item.id == _id:
return item
return None
def need_to_valid_url(url: str, item: Union[ReserveResultPojo, None]) -> bool:
"""判断是否需要验证URL"""
print("url is :" + url)
parts = url.split('/')
_id = parts[5]
if len(_id) == 6:
if item:
if item.url_validated is not None:
return not item.url_validated
else:
# 如果url_validated为None,需要验证
return True
return True
print("id not valid:{}".format(_id))
return False
def need_to_check_email(mail: str, successful_items) -> bool:
"""判断是否需要检查邮件"""
print("successful_items size is " + str(len(successful_items)))
# 过滤已验证的项目
filtered_items = [item for item in successful_items if item.email == mail]
# 检查是否有已验证的项目
validated_items = [item for item in filtered_items
if item.url_validated is not None and item.url_validated is True]
return len(validated_items) == 0
def find_links_to_validate_from_mail_list(
mail_list: List[MailAddress],
logger,
proxy: Optional[ProxyConfig] = None,
) -> List[str]:
"""从邮件列表中查找需要验证的链接,返回读取失败的GMX账户列表"""
if not mail_list:
return []
# 检查时间前开始检查邮件
contact_to_book_list = MONGO_STORE_MANAGER.get_all_contact_to_book_list()
successful_items = MONGO_STORE_MANAGER.get_all_successful_items_for_day()
mails_messages = []
failed_gmx: List[str] = []
# 使用线程池处理邮件
with ThreadPoolExecutor(max_workers=100) as executor:
futures = []
for mail in mail_list:
# 检查是否需要读取邮件
if need_to_check_email(mail.mail, successful_items):
mail_reader = MailReader(mail.mail, mail.password, proxy=proxy,
failed_gmx_list=failed_gmx)
future = executor.submit(mail_reader.read_emails, mails_messages)
futures.append(future)
# 等待所有任务完成
for future in futures:
try:
future.result(timeout=FUTURE_TIMEOUT)
except TimeoutError:
print("⏱️ Timeout ({} s) dépassé pour une boîte mail — lecture ignorée.".format(FUTURE_TIMEOUT))
except Exception as e:
print("Error processing mail: {},login: {}, password: {}".format(e,mail.mail, mail.password))
# ── Résumé des comptes proxy en échec ──────────────────────
if failed_gmx:
print("\n[Proxy] ⚠️ {} compte(s) non lus (GMX / inbox.lv) :".format(len(failed_gmx)))
for addr in failed_gmx:
print("{}".format(addr))
else:
print("\n[Proxy] ✅ Tous les comptes GMX / inbox.lv ont été lus avec succès.")
# 刷新成功的项目
_refreshed_successful_items = MONGO_STORE_MANAGER.get_all_successful_items_for_day()
# 处理邮件中的链接
for mail in mails_messages:
match = re.search(VALIDATION_URL_REGEX, mail.body)
if match:
url = match.group(0)
_item = find_item_by_url(url, _refreshed_successful_items)
if need_to_valid_url(url, _item):
logger.info("need to validate url: " + url)
_model = ""
_used_ip = ""
if _item:
_model = _item.model
_used_ip = _item.current_ip
MONGO_STORE_MANAGER.save_links_to_validate(
url,
mail.to_address,
model=_model,
_all_contact_list=contact_to_book_list, _used_ip= _used_ip)
else:
logger.info("do not need to click url --> {}".format(mail.mail_address))
return failed_gmx
# 主函数
if __name__ == '__main__':
# 读取联系人列表
contact_to_book_list = read_contacts(
# file_name="~/Desktop/contact_list_inbox_lv_100.xlsx")
file_name="~/Desktop/contact_list_2026-04-07.xlsx")
# file_name="~/Desktop/contact_list_2026-04-01.xlsx")
# file_name="~/Desktop/contact_list_2026-03-28.xlsx")
# file_name="~/Desktop/contact_list_2025-11-28.xlsx")
# file_name="~/Desktop/contact_list_2025-11-06.xlsx")
# 获取目标邮箱列表
all_mail_list = MONGO_STORE_MANAGER.get_destination_emails()
# 筛选需要检查的邮件列表
mail_list_to_check = []
for contact in contact_to_book_list:
for mail in all_mail_list:
if contact.mail == mail.mail:
mail_list_to_check.append(mail)
# 设置日志记录器
logger = logging.getLogger()
# 获取已验证的链接列表
_all_links = MONGO_STORE_MANAGER.get_links_to_validate()
# 过滤掉已处理的邮件
filter_mail = []
for mail_pojo in mail_list_to_check:
_to_add = True
for _link in _all_links:
if _link.email == mail_pojo.mail:
_to_add = False
if _to_add:
filter_mail.append(mail_pojo)
# filter_mail = [MailAddress("munozshawn1992@aol.com", "leivqvcwyacrgbzp")]
# ── Mode de lecture : GMX_ONLY=true → uniquement les comptes GMX ──
gmx_only = os.environ.get("GMX_ONLY", "false").strip().lower() == "true"
if gmx_only:
filter_mail = [m for m in filter_mail if is_gmx_account(m.mail)]
print("[Mode] Lecture GMX uniquement ({} comptes)".format(len(filter_mail)))
else:
print("[Mode] Lecture de tous les comptes ({} comptes)".format(len(filter_mail)))
# 配置代理(GMX账号必须通过代理读取)
gmx_proxy = ProxyConfig(
host=os.environ.get("GMX_PROXY_HOST", ""),
port=int(os.environ.get("GMX_PROXY_PORT", "443")),
proxy_type=os.environ.get("GMX_PROXY_TYPE", "SOCKS5"),
username=os.environ.get("GMX_PROXY_USERNAME"),
password=os.environ.get("GMX_PROXY_PASSWORD"),
)
# 处理邮件
failed = find_links_to_validate_from_mail_list(filter_mail, logger, proxy=gmx_proxy)
# ── Afficher les comptes GMX non lus ─────────────────────
if failed:
print("\n===== Comptes GMX non lus ({}) =====".format(len(failed)))
for addr in failed:
print("{}".format(addr))
else:
print("\n===== Tous les comptes GMX ont été lus avec succès =====")