""" imap_proxy_reader.py ==================== Lire des emails via IMAPClient en passant par un proxy SOCKS5/SOCKS4/HTTP. Fonctionnement : - ProxyIMAP4_TLS : sous-classe de imaplib.IMAP4 qui ouvre la socket à travers un proxy SOCKS via PySocks. - ProxyIMAPClient : sous-classe de IMAPClient qui injecte ProxyIMAP4_TLS au lieu de la connexion directe habituelle. Dépendances : pip install imapclient PySocks """ import datetime import email import imaplib import io import logging import os import re import ssl import socket from dataclasses import dataclass, field from email.message import Message from typing import List, Optional, Tuple import random import socks from dotenv import load_dotenv from imapclient import IMAPClient load_dotenv() # ────────────────────────────────────────────────────────────── # Constantes # ────────────────────────────────────────────────────────────── VALIDATION_URL_SUBJECT_FR = "Validation de votre demande de rendez-vous" VALIDATION_URL_SUBJECT_EN = "Please confirm your appointment request" VALIDATION_URL_REGEX = ( r"https:\/\/rendezvousparis\.hermes\.com" r"\/client\/register\/[A-Z0-9]+\/validate[?.]code=[A-Z0-9]+" ) DATE_FORMAT = "%d-%b-%Y" # Correspondance domaine → serveur IMAP (identique à mail_constants.py) IMAP_SERVER_MAP: List[Tuple[str, str]] = [ ("163.com", "imap.163.com"), ("yahoo.com", "imap.mail.yahoo.com"), ("firemail.de", "imap.firemail.de"), ("gmail.com", "imap.gmail.com"), ("sina.com", "imap.sina.com"), ("hotmail.com", "outlook.office365.com"), ("outlook.com", "outlook.office365.com"), ("rambler.ru", "imap.rambler.ru"), ("btvm.ne.jp", "imap.btvm.ne.jp"), ("mars.dti.ne.jp", "imap.cm.dream.jp"), ("aurora.dti.ne.jp", "imap.cm.dream.jp"), ("naver.com", "imap.naver.com"), ("onet.pl", "imap.poczta.onet.pl"), ("gazeta.pl", "imap.gazeta.pl"), ("tim.it", "imap.tim.it"), ("alice.it", "in.alice.it"), ("gmx.com", "imap.gmx.com"), ("gmx.fr", "imap.gmx.com"), ("gmx.us", "imap.gmx.com"), ("gmx.ch", "imap.gmx.com"), ("gmx.pt", "imap.gmx.com"), ("gmx.sg", "imap.gmx.com"), ("gmx.net", "imap.gmx.net"), ("gmx.de", "imap.gmx.net"), ("gmx.at", "imap.gmx.at"), ("web.de", "imap.web.de"), ("inbox.lv", "mail.inbox.lv"), ("pissmail.com", "mail.pissmail.com"), ("incel.email", "mail.pissmail.com"), ("shitposting.expert","mail.pissmail.com"), ("hatesje.ws", "mail.pissmail.com"), ("child.pizza", "mail.pissmail.com"), ("genocide.fun", "mail.pissmail.com"), ("dmc.chat", "mail.pissmail.com"), ("aol.com", "imap.aol.com"), # fallback AOL ] PROXY_TYPE_MAP = { "SOCKS5": socks.SOCKS5, "SOCKS4": socks.SOCKS4, "HTTP": socks.HTTP, } logger = logging.getLogger(__name__) # ────────────────────────────────────────────────────────────── # Modèles de données # ────────────────────────────────────────────────────────────── @dataclass class ProxyConfig: """Configuration du proxy.""" host: str port: int proxy_type: str = "SOCKS5" # "SOCKS5" | "SOCKS4" | "HTTP" username: Optional[str] = None password: Optional[str] = None @property def socks_type(self) -> int: t = self.proxy_type.upper() if t not in PROXY_TYPE_MAP: raise ValueError(f"proxy_type invalide : {self.proxy_type!r}. " f"Valeurs autorisées : {list(PROXY_TYPE_MAP)}") return PROXY_TYPE_MAP[t] def __repr__(self) -> str: auth = f"{self.username}:***@" if self.username else "" return f"{self.proxy_type}://{auth}{self.host}:{self.port}" @dataclass class MailAccount: """Compte email à lire.""" login: str password: str @dataclass class MailResult: """Résultat d'une lecture d'email.""" account: str subject: str from_address: str to_address: str body: str validation_urls: List[str] = field(default_factory=list) # ────────────────────────────────────────────────────────────── # Connexion IMAP via proxy (bas niveau) # ────────────────────────────────────────────────────────────── class ProxyIMAP4_TLS(imaplib.IMAP4): """ Variante TLS de imaplib.IMAP4 qui route la connexion à travers un proxy SOCKS5/SOCKS4/HTTP grâce à PySocks. """ def __init__( self, host: str, port: int, ssl_context: Optional[ssl.SSLContext], proxy: ProxyConfig, timeout: Optional[float] = None, ): self._ssl_context = ssl_context self._proxy = proxy self._timeout = timeout # imaplib.IMAP4.__init__ appelle self.open() imaplib.IMAP4.__init__(self, host, port) self.file: io.BufferedReader def open(self, host: str = "", port: int = 993, timeout: Optional[float] = None) -> None: self.host = host self.port = port effective_timeout = timeout if timeout is not None else self._timeout # ── Créer la socket SOCKS ──────────────────────────── sock = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM) sock.set_proxy( proxy_type=self._proxy.socks_type, addr=self._proxy.host, port=self._proxy.port, username=self._proxy.username, password=self._proxy.password, ) if effective_timeout: sock.settimeout(effective_timeout) sock.connect((host, port)) # ── Envelopper avec SSL/TLS ────────────────────────── ctx = self._ssl_context or ssl.create_default_context() self.sock = ctx.wrap_socket(sock, server_hostname=host) self.file = self.sock.makefile("rb") # ── Méthodes requises par imaplib.IMAP4 ───────────────── def read(self, size: int) -> bytes: return self.file.read(size) # type: ignore[return-value] def readline(self) -> bytes: return self.file.readline() # type: ignore[return-value] def send(self, data) -> None: self.sock.sendall(data) def shutdown(self) -> None: imaplib.IMAP4.shutdown(self) def id(self, parameters: dict) -> tuple: """ Envoie la commande IMAP ID (RFC 2971). parameters : dict ex. {"name": "Thunderbird", "version": "115.0"} Retourne le tuple brut (typ, data) renvoyé par le serveur. """ args = " ".join( '"{}"'.format(str(v).replace('"', '\\"')) for pair in parameters.items() for v in pair ) return self._simple_command("ID", "({})".format(args)) # ────────────────────────────────────────────────────────────── # Profils de clients IMAP réels (pour spoofing du fingerprint) # ────────────────────────────────────────────────────────────── _IMAP_CLIENT_PROFILES = [ # Mozilla Thunderbird 115 (ESR) — Windows { "name": "Thunderbird", "version": "115.9.0", "vendor": "Mozilla", "support-url": "https://support.mozilla.org/", "command": "IMAP4rev1", "os": "Windows NT 10.0", "os-version": "10.0", }, # Mozilla Thunderbird 115 — macOS { "name": "Thunderbird", "version": "115.9.0", "vendor": "Mozilla", "support-url": "https://support.mozilla.org/", "command": "IMAP4rev1", "os": "macOS", "os-version": "14.4", }, # Apple Mail — macOS Sonoma { "name": "Mac OS X Mail", "version": "16.0", "vendor": "Apple Inc.", "support-url": "https://support.apple.com/mail", "os": "Mac OS X", "os-version": "14.4", }, # Apple Mail — iOS { "name": "iPhone Mail", "version": "17.4", "vendor": "Apple Inc.", "os": "iOS", "os-version": "17.4", }, # Outlook pour Windows (MAPI/IMAP bridge) { "name": "Microsoft Outlook", "version": "16.0.17531.20108", "vendor": "Microsoft Corporation", "support-url": "https://support.microsoft.com/outlook", "os": "Windows NT 10.0", "os-version": "10.0", }, ] def _random_imap_id_params() -> dict: """Retourne un profil aléatoire parmi les clients IMAP réels.""" return random.choice(_IMAP_CLIENT_PROFILES) def send_imap_id(imap, params: Optional[dict] = None) -> None: """ Envoie la commande IMAP ID après connexion pour usurper le fingerprint client. Fonctionne avec IMAPClient (imapclient) et imaplib.IMAP4. Paramètres ---------- imap : IMAPClient | imaplib.IMAP4 params : dict, optional — si None, un profil aléatoire est choisi. """ if params is None: params = _random_imap_id_params() try: if isinstance(imap, IMAPClient): # imapclient expose _imap (l'objet imaplib sous-jacent) _raw = imap._imap if hasattr(_raw, "id"): _raw.id(params) else: # Fallback : commande brute via imapclient args = " ".join( '"{}"'.format(str(v).replace('"', '\\"')) for pair in params.items() for v in pair ) imap._imap._simple_command("ID", "({})".format(args)) elif hasattr(imap, "id"): # ProxyIMAP4_TLS ou tout imaplib.IMAP4 patchable imap.id(params) else: # Dernier recours : commande brute imaplib args = " ".join( '"{}"'.format(str(v).replace('"', '\\"')) for pair in params.items() for v in pair ) imap._simple_command("ID", "({})".format(args)) except Exception as exc: logger.debug("IMAP ID non supporté ou ignoré : %s", exc) class ProxyIMAPClient(IMAPClient): """ Sous-classe d'IMAPClient qui utilise un proxy SOCKS/HTTP. Usage : proxy = ProxyConfig(host="127.0.0.1", port=1080, proxy_type="SOCKS5") client = ProxyIMAPClient("imap.gmail.com", proxy=proxy, use_uid=True) client.login("user@gmail.com", "password") """ def __init__(self, host: str, proxy: ProxyConfig, **kwargs): self._proxy = proxy super().__init__(host, **kwargs) def _create_IMAP4(self): """Remplace la méthode d'IMAPClient pour injecter ProxyIMAP4_TLS.""" if self.ssl: # self._timeout peut être un float (secondes) ou un objet avec # un attribut 'connect' (ex : urllib3 Timeout). On gère les deux. _timeout = self._timeout if _timeout is not None and not isinstance(_timeout, (int, float)): _timeout = getattr(_timeout, "connect", None) return ProxyIMAP4_TLS( host=self.host, port=self.port, ssl_context=self.ssl_context, proxy=self._proxy, timeout=_timeout, ) # Connexion non-SSL à travers le proxy (rare, mais supporté) # On monkey-patch juste la connexion TCP raise NotImplementedError( "Connexion IMAP non-SSL via proxy non implémentée. " "Utilisez ssl=True (port 993)." ) def login(self, username: str, password: str): """Surcharge login() pour envoyer IMAP ID juste après l'authentification.""" result = super().login(username, password) send_imap_id(self) return result # ────────────────────────────────────────────────────────────── # Fonctions utilitaires # ────────────────────────────────────────────────────────────── def get_imap_server(login: str) -> str: """Retourne le serveur IMAP correspondant au domaine du login.""" login_lower = login.lower() for domain, server in IMAP_SERVER_MAP: if domain in login_lower: return server return "imap.aol.com" # fallback def extract_body(email_message: Message) -> str: """Extrait le corps HTML ou texte d'un email.""" body = "" for part in email_message.walk(): content_type = part.get_content_type() try: if content_type == "text/html": payload = part.get_payload(decode=True) if payload: body += payload.decode("utf-8", errors="ignore") elif content_type == "text/plain": payload = part.get_payload() if payload: body += str(payload) except Exception as exc: logger.warning("Erreur extraction body : %s", exc) return body def find_validation_urls(text: str) -> List[str]: """Recherche toutes les URLs de validation Hermes dans un texte.""" return re.findall(VALIDATION_URL_REGEX, text) # ────────────────────────────────────────────────────────────── # Lecteur principal # ────────────────────────────────────────────────────────────── class ProxyMailReader: """ Lit les emails d'un compte via IMAPClient en passant par un proxy. Paramètres ---------- account : MailAccount Identifiants du compte email. proxy : ProxyConfig Configuration du proxy. timeout : float, optional Timeout de connexion en secondes (défaut : 30 s). """ def __init__( self, account: MailAccount, proxy: ProxyConfig, timeout: float = 30.0, ): self.account = account self.proxy = proxy self.timeout = timeout # ── Connexion ──────────────────────────────────────────── def _connect(self) -> ProxyIMAPClient: imap_server = get_imap_server(self.account.login) logger.info( "[%s] Connexion via %s → %s:993", self.account.login, self.proxy, imap_server, ) client = ProxyIMAPClient( host=imap_server, proxy=self.proxy, use_uid=True, ssl=True, timeout=self.timeout, ) client.login(self.account.login, self.account.password) logger.info("[%s] Connecté.", self.account.login) return client # ── Lecture des dossiers ───────────────────────────────── def _list_folders(self, client: ProxyIMAPClient) -> List[str]: return [info[-1] for info in client.list_folders()] # ── Lecture des messages ───────────────────────────────── def _read_folder( self, client: ProxyIMAPClient, folder: str, since: Optional[datetime.datetime] = None, ) -> List[MailResult]: results: List[MailResult] = [] since = since or datetime.datetime.today() try: client.select_folder(folder, readonly=True) except Exception as exc: logger.warning("[%s] Impossible d'ouvrir '%s' : %s", self.account.login, folder, exc) return results try: uids = client.search(["SINCE", since]) except Exception as exc: logger.warning("[%s] Recherche échouée dans '%s' : %s", self.account.login, folder, exc) return results if not uids: return results logger.info("[%s] %d message(s) dans '%s'", self.account.login, len(uids), folder) for uid, msg_data in client.fetch(uids, "RFC822").items(): try: raw = msg_data.get(b"RFC822") or msg_data.get("RFC822") if raw is None: continue em = email.message_from_bytes(raw) subject = em.get("Subject", "") from_addr = em.get("From", "") to_addr = em.get("To", self.account.login) # Filtrer : on ne garde que les emails de validation Hermes is_validation = ( VALIDATION_URL_SUBJECT_FR in subject or VALIDATION_URL_SUBJECT_EN in subject or "no-reply@hermes.com" in from_addr.lower() ) if not is_validation: continue body = extract_body(em) urls = find_validation_urls(body) result = MailResult( account=self.account.login, subject=subject, from_address=from_addr, to_address=to_addr, body=body, validation_urls=urls, ) results.append(result) logger.info( "[%s] Email de validation trouvé (uid=%s) — URLs : %s", self.account.login, uid, urls or "aucune", ) except Exception as exc: logger.warning( "[%s] Erreur traitement uid=%s : %s", self.account.login, uid, exc, ) return results # ── Point d'entrée public ──────────────────────────────── def read( self, since: Optional[datetime.datetime] = None, skip_folders: Optional[List[str]] = None, ) -> List[MailResult]: """ Se connecte au serveur IMAP via le proxy et retourne la liste des emails de validation trouvés depuis `since` (aujourd'hui par défaut). Paramètres ---------- since : datetime, optional — date de début de recherche skip_folders : list[str], optional — dossiers à ignorer (défaut : ["Sent", "Drafts", "Trash", "Junk", "Spam"]) """ if skip_folders is None: skip_folders = ["Sent", "Drafts", "Trash", "Junk", "Spam"] all_results: List[MailResult] = [] client = self._connect() try: folders = self._list_folders(client) logger.info("[%s] Dossiers : %s", self.account.login, folders) for folder in folders: if folder in skip_folders: logger.debug("[%s] Dossier ignoré : %s", self.account.login, folder) continue all_results.extend(self._read_folder(client, folder, since)) finally: try: client.logout() except Exception: pass return all_results # ────────────────────────────────────────────────────────────── # Lecture parallèle de plusieurs comptes # ────────────────────────────────────────────────────────────── from concurrent.futures import ThreadPoolExecutor, as_completed def read_multiple_accounts( accounts: List[MailAccount], proxy: ProxyConfig, since: Optional[datetime.datetime] = None, max_workers: int = 10, timeout: float = 30.0, ) -> List[MailResult]: """ Lit plusieurs comptes email en parallèle via le même proxy. Retourne la liste consolidée de tous les MailResult trouvés. """ all_results: List[MailResult] = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: future_map = { executor.submit( ProxyMailReader(acc, proxy, timeout).read, since ): acc.login for acc in accounts } for future in as_completed(future_map): login = future_map[future] try: results = future.result() logger.info("[%s] %d email(s) de validation récupéré(s).", login, len(results)) all_results.extend(results) except Exception as exc: logger.error("[%s] Erreur : %s", login, exc) return all_results # ────────────────────────────────────────────────────────────── # Point d'entrée — exemple d'utilisation # ────────────────────────────────────────────────────────────── if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)-8s %(message)s", datefmt="%H:%M:%S", ) # ── 1. Configurer le proxy ─────────────────────────────── proxy = ProxyConfig( host=os.environ.get("GMX_PROXY_HOST", ""), port=int(os.environ.get("GMX_PROXY_PORT", "443")), proxy_type=os.environ.get("GMX_PROXY_TYPE", "SOCKS5"), username=os.environ.get("GMX_PROXY_USERNAME"), password=os.environ.get("GMX_PROXY_PASSWORD"), ) # ── 2. Définir les comptes à lire ──────────────────────── accounts = [ MailAccount(login="birgitnaya@gmx.net", password="XEeUF3Y1yaO"), # MailAccount(login="user@gmail.com", password="apppassword"), # MailAccount(login="user@outlook.com", password="password"), ] # ── 3. Lancer la lecture ───────────────────────────────── results = read_multiple_accounts( accounts=accounts, proxy=proxy, since=datetime.datetime.today(), max_workers=5, timeout=30.0, ) # ── 4. Afficher les résultats ──────────────────────────── print(f"\n{'='*60}") print(f" {len(results)} email(s) de validation trouvé(s)") print(f"{'='*60}\n") for r in results: print(f" Compte : {r.account}") print(f" De : {r.from_address}") print(f" Sujet : {r.subject}") print(f" URLs : {r.validation_urls or 'aucune'}") print(f" {'-'*56}")