""" imap_proxy_reader.py ==================== Lire des emails via IMAPClient en passant par un proxy SOCKS5/SOCKS4/HTTP. Fonctionnement : - ProxyIMAP4_TLS : sous-classe de imaplib.IMAP4 qui ouvre la socket à travers un proxy SOCKS via PySocks. - ProxyIMAPClient : sous-classe de IMAPClient qui injecte ProxyIMAP4_TLS au lieu de la connexion directe habituelle. Dépendances : pip install imapclient PySocks """ import datetime import email import imaplib import io import logging import os import re import ssl import socket from dataclasses import dataclass, field from email.message import Message from typing import List, Optional, Tuple import socks from dotenv import load_dotenv from imapclient import IMAPClient load_dotenv() # ────────────────────────────────────────────────────────────── # Constantes # ────────────────────────────────────────────────────────────── VALIDATION_URL_SUBJECT_FR = "Validation de votre demande de rendez-vous" VALIDATION_URL_SUBJECT_EN = "Please confirm your appointment request" VALIDATION_URL_REGEX = ( r"https:\/\/rendezvousparis\.hermes\.com" r"\/client\/register\/[A-Z0-9]+\/validate\.code=[A-Z0-9]+" ) DATE_FORMAT = "%d-%b-%Y" # Correspondance domaine → serveur IMAP (identique à mail_constants.py) IMAP_SERVER_MAP: List[Tuple[str, str]] = [ ("163.com", "imap.163.com"), ("yahoo.com", "imap.mail.yahoo.com"), ("firemail.de", "imap.firemail.de"), ("gmail.com", "imap.gmail.com"), ("sina.com", "imap.sina.com"), ("hotmail.com", "outlook.office365.com"), ("outlook.com", "outlook.office365.com"), ("rambler.ru", "imap.rambler.ru"), ("btvm.ne.jp", "imap.btvm.ne.jp"), ("mars.dti.ne.jp", "imap.cm.dream.jp"), ("aurora.dti.ne.jp", "imap.cm.dream.jp"), ("naver.com", "imap.naver.com"), ("onet.pl", "imap.poczta.onet.pl"), ("gazeta.pl", "imap.gazeta.pl"), ("tim.it", "imap.tim.it"), ("alice.it", "in.alice.it"), ("gmx.com", "imap.gmx.com"), ("gmx.fr", "imap.gmx.com"), ("gmx.us", "imap.gmx.com"), ("gmx.ch", "imap.gmx.com"), ("gmx.pt", "imap.gmx.com"), ("gmx.sg", "imap.gmx.com"), ("gmx.net", "imap.gmx.net"), ("gmx.de", "imap.gmx.net"), ("gmx.at", "imap.gmx.at"), ("web.de", "imap.web.de"), ("inbox.lv", "mail.inbox.lv"), ("pissmail.com", "mail.pissmail.com"), ("incel.email", "mail.pissmail.com"), ("shitposting.expert","mail.pissmail.com"), ("hatesje.ws", "mail.pissmail.com"), ("child.pizza", "mail.pissmail.com"), ("genocide.fun", "mail.pissmail.com"), ("dmc.chat", "mail.pissmail.com"), ("aol.com", "imap.aol.com"), # fallback AOL ] PROXY_TYPE_MAP = { "SOCKS5": socks.SOCKS5, "SOCKS4": socks.SOCKS4, "HTTP": socks.HTTP, } logger = logging.getLogger(__name__) # ────────────────────────────────────────────────────────────── # Modèles de données # ────────────────────────────────────────────────────────────── @dataclass class ProxyConfig: """Configuration du proxy.""" host: str port: int proxy_type: str = "SOCKS5" # "SOCKS5" | "SOCKS4" | "HTTP" username: Optional[str] = None password: Optional[str] = None @property def socks_type(self) -> int: t = self.proxy_type.upper() if t not in PROXY_TYPE_MAP: raise ValueError(f"proxy_type invalide : {self.proxy_type!r}. " f"Valeurs autorisées : {list(PROXY_TYPE_MAP)}") return PROXY_TYPE_MAP[t] def __repr__(self) -> str: auth = f"{self.username}:***@" if self.username else "" return f"{self.proxy_type}://{auth}{self.host}:{self.port}" @dataclass class MailAccount: """Compte email à lire.""" login: str password: str @dataclass class MailResult: """Résultat d'une lecture d'email.""" account: str subject: str from_address: str to_address: str body: str validation_urls: List[str] = field(default_factory=list) # ────────────────────────────────────────────────────────────── # Connexion IMAP via proxy (bas niveau) # ────────────────────────────────────────────────────────────── class ProxyIMAP4_TLS(imaplib.IMAP4): """ Variante TLS de imaplib.IMAP4 qui route la connexion à travers un proxy SOCKS5/SOCKS4/HTTP grâce à PySocks. """ def __init__( self, host: str, port: int, ssl_context: Optional[ssl.SSLContext], proxy: ProxyConfig, timeout: Optional[float] = None, ): self._ssl_context = ssl_context self._proxy = proxy self._timeout = timeout # imaplib.IMAP4.__init__ appelle self.open() imaplib.IMAP4.__init__(self, host, port) self.file: io.BufferedReader def open(self, host: str = "", port: int = 993, timeout: Optional[float] = None) -> None: self.host = host self.port = port effective_timeout = timeout if timeout is not None else self._timeout # ── Créer la socket SOCKS ──────────────────────────── sock = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM) sock.set_proxy( proxy_type=self._proxy.socks_type, addr=self._proxy.host, port=self._proxy.port, username=self._proxy.username, password=self._proxy.password, ) if effective_timeout: sock.settimeout(effective_timeout) sock.connect((host, port)) # ── Envelopper avec SSL/TLS ────────────────────────── ctx = self._ssl_context or ssl.create_default_context() self.sock = ctx.wrap_socket(sock, server_hostname=host) self.file = self.sock.makefile("rb") # ── Méthodes requises par imaplib.IMAP4 ───────────────── def read(self, size: int) -> bytes: return self.file.read(size) # type: ignore[return-value] def readline(self) -> bytes: return self.file.readline() # type: ignore[return-value] def send(self, data) -> None: self.sock.sendall(data) def shutdown(self) -> None: imaplib.IMAP4.shutdown(self) # ────────────────────────────────────────────────────────────── # IMAPClient avec proxy # ────────────────────────────────────────────────────────────── class ProxyIMAPClient(IMAPClient): """ Sous-classe d'IMAPClient qui utilise un proxy SOCKS/HTTP. Usage : proxy = ProxyConfig(host="127.0.0.1", port=1080, proxy_type="SOCKS5") client = ProxyIMAPClient("imap.gmail.com", proxy=proxy, use_uid=True) client.login("user@gmail.com", "password") """ def __init__(self, host: str, proxy: ProxyConfig, **kwargs): self._proxy = proxy super().__init__(host, **kwargs) def _create_IMAP4(self): """Remplace la méthode d'IMAPClient pour injecter ProxyIMAP4_TLS.""" if self.ssl: return ProxyIMAP4_TLS( host=self.host, port=self.port, ssl_context=self.ssl_context, proxy=self._proxy, timeout=getattr(self._timeout, "connect", None), ) # Connexion non-SSL à travers le proxy (rare, mais supporté) # On monkey-patch juste la connexion TCP raise NotImplementedError( "Connexion IMAP non-SSL via proxy non implémentée. " "Utilisez ssl=True (port 993)." ) # ────────────────────────────────────────────────────────────── # Fonctions utilitaires # ────────────────────────────────────────────────────────────── def get_imap_server(login: str) -> str: """Retourne le serveur IMAP correspondant au domaine du login.""" login_lower = login.lower() for domain, server in IMAP_SERVER_MAP: if domain in login_lower: return server return "imap.aol.com" # fallback def extract_body(email_message: Message) -> str: """Extrait le corps HTML ou texte d'un email.""" body = "" for part in email_message.walk(): content_type = part.get_content_type() try: if content_type == "text/html": payload = part.get_payload(decode=True) if payload: body += payload.decode("utf-8", errors="ignore") elif content_type == "text/plain": payload = part.get_payload() if payload: body += str(payload) except Exception as exc: logger.warning("Erreur extraction body : %s", exc) return body def find_validation_urls(text: str) -> List[str]: """Recherche toutes les URLs de validation Hermes dans un texte.""" return re.findall(VALIDATION_URL_REGEX, text) # ────────────────────────────────────────────────────────────── # Lecteur principal # ────────────────────────────────────────────────────────────── class ProxyMailReader: """ Lit les emails d'un compte via IMAPClient en passant par un proxy. Paramètres ---------- account : MailAccount Identifiants du compte email. proxy : ProxyConfig Configuration du proxy. timeout : float, optional Timeout de connexion en secondes (défaut : 30 s). """ def __init__( self, account: MailAccount, proxy: ProxyConfig, timeout: float = 30.0, ): self.account = account self.proxy = proxy self.timeout = timeout # ── Connexion ──────────────────────────────────────────── def _connect(self) -> ProxyIMAPClient: imap_server = get_imap_server(self.account.login) logger.info( "[%s] Connexion via %s → %s:993", self.account.login, self.proxy, imap_server, ) client = ProxyIMAPClient( host=imap_server, proxy=self.proxy, use_uid=True, ssl=True, timeout=self.timeout, ) client.login(self.account.login, self.account.password) logger.info("[%s] Connecté.", self.account.login) return client # ── Lecture des dossiers ───────────────────────────────── def _list_folders(self, client: ProxyIMAPClient) -> List[str]: return [info[-1] for info in client.list_folders()] # ── Lecture des messages ───────────────────────────────── def _read_folder( self, client: ProxyIMAPClient, folder: str, since: Optional[datetime.datetime] = None, ) -> List[MailResult]: results: List[MailResult] = [] since = since or datetime.datetime.today() try: client.select_folder(folder, readonly=True) except Exception as exc: logger.warning("[%s] Impossible d'ouvrir '%s' : %s", self.account.login, folder, exc) return results try: uids = client.search(["SINCE", since]) except Exception as exc: logger.warning("[%s] Recherche échouée dans '%s' : %s", self.account.login, folder, exc) return results if not uids: return results logger.info("[%s] %d message(s) dans '%s'", self.account.login, len(uids), folder) for uid, msg_data in client.fetch(uids, "RFC822").items(): try: raw = msg_data.get(b"RFC822") or msg_data.get("RFC822") if raw is None: continue em = email.message_from_bytes(raw) subject = em.get("Subject", "") from_addr = em.get("From", "") to_addr = em.get("To", self.account.login) # Filtrer : on ne garde que les emails de validation Hermes is_validation = ( VALIDATION_URL_SUBJECT_FR in subject or VALIDATION_URL_SUBJECT_EN in subject or "no-reply@hermes.com" in from_addr.lower() ) if not is_validation: continue body = extract_body(em) urls = find_validation_urls(body) result = MailResult( account=self.account.login, subject=subject, from_address=from_addr, to_address=to_addr, body=body, validation_urls=urls, ) results.append(result) logger.info( "[%s] Email de validation trouvé (uid=%s) — URLs : %s", self.account.login, uid, urls or "aucune", ) except Exception as exc: logger.warning( "[%s] Erreur traitement uid=%s : %s", self.account.login, uid, exc, ) return results # ── Point d'entrée public ──────────────────────────────── def read( self, since: Optional[datetime.datetime] = None, skip_folders: Optional[List[str]] = None, ) -> List[MailResult]: """ Se connecte au serveur IMAP via le proxy et retourne la liste des emails de validation trouvés depuis `since` (aujourd'hui par défaut). Paramètres ---------- since : datetime, optional — date de début de recherche skip_folders : list[str], optional — dossiers à ignorer (défaut : ["Sent", "Drafts", "Trash", "Junk", "Spam"]) """ if skip_folders is None: skip_folders = ["Sent", "Drafts", "Trash", "Junk", "Spam"] all_results: List[MailResult] = [] client = self._connect() try: folders = self._list_folders(client) logger.info("[%s] Dossiers : %s", self.account.login, folders) for folder in folders: if folder in skip_folders: logger.debug("[%s] Dossier ignoré : %s", self.account.login, folder) continue all_results.extend(self._read_folder(client, folder, since)) finally: try: client.logout() except Exception: pass return all_results # ────────────────────────────────────────────────────────────── # Lecture parallèle de plusieurs comptes # ────────────────────────────────────────────────────────────── from concurrent.futures import ThreadPoolExecutor, as_completed def read_multiple_accounts( accounts: List[MailAccount], proxy: ProxyConfig, since: Optional[datetime.datetime] = None, max_workers: int = 10, timeout: float = 30.0, ) -> List[MailResult]: """ Lit plusieurs comptes email en parallèle via le même proxy. Retourne la liste consolidée de tous les MailResult trouvés. """ all_results: List[MailResult] = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: future_map = { executor.submit( ProxyMailReader(acc, proxy, timeout).read, since ): acc.login for acc in accounts } for future in as_completed(future_map): login = future_map[future] try: results = future.result() logger.info("[%s] %d email(s) de validation récupéré(s).", login, len(results)) all_results.extend(results) except Exception as exc: logger.error("[%s] Erreur : %s", login, exc) return all_results # ────────────────────────────────────────────────────────────── # Point d'entrée — exemple d'utilisation # ────────────────────────────────────────────────────────────── if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)-8s %(message)s", datefmt="%H:%M:%S", ) # ── 1. Configurer le proxy ─────────────────────────────── proxy = ProxyConfig( host=os.environ.get("GMX_PROXY_HOST", ""), port=int(os.environ.get("GMX_PROXY_PORT", "443")), proxy_type=os.environ.get("GMX_PROXY_TYPE", "SOCKS5"), username=os.environ.get("GMX_PROXY_USERNAME"), password=os.environ.get("GMX_PROXY_PASSWORD"), ) # ── 2. Définir les comptes à lire ──────────────────────── accounts = [ MailAccount(login="birgitnaya@gmx.net", password="XEeUF3Y1yaO"), # MailAccount(login="user@gmail.com", password="apppassword"), # MailAccount(login="user@outlook.com", password="password"), ] # ── 3. Lancer la lecture ───────────────────────────────── results = read_multiple_accounts( accounts=accounts, proxy=proxy, since=datetime.datetime.today(), max_workers=5, timeout=30.0, ) # ── 4. Afficher les résultats ──────────────────────────── print(f"\n{'='*60}") print(f" {len(results)} email(s) de validation trouvé(s)") print(f"{'='*60}\n") for r in results: print(f" Compte : {r.account}") print(f" De : {r.from_address}") print(f" Sujet : {r.subject}") print(f" URLs : {r.validation_urls or 'aucune'}") print(f" {'-'*56}")