""" imap_proxy_reader.py ==================== Lire des emails via IMAPClient en passant par un proxy SOCKS5/SOCKS4/HTTP. Fonctionnement : - ProxyIMAP4_TLS : sous-classe de imaplib.IMAP4 qui ouvre la socket à travers un proxy SOCKS via PySocks. - ProxyIMAPClient : sous-classe de IMAPClient qui injecte ProxyIMAP4_TLS au lieu de la connexion directe habituelle. Dépendances : pip install imapclient PySocks """ import datetime import email import hashlib import imaplib import io import logging import os import re import socket import ssl import sys import time from dataclasses import dataclass from email.message import Message from typing import List, Optional, Tuple import socks from dotenv import load_dotenv from imapclient import IMAPClient load_dotenv() # ────────────────────────────────────────────────────────────── # Constantes # ────────────────────────────────────────────────────────────── DATE_FORMAT = "%d-%b-%Y" # Correspondance domaine → serveur IMAP (identique à mail_constants.py) IMAP_SERVER_MAP: List[Tuple[str, str]] = [ ("163.com", "imap.163.com"), ("yahoo.com", "imap.mail.yahoo.com"), ("firemail.de", "imap.firemail.de"), ("gmail.com", "imap.gmail.com"), ("sina.com", "imap.sina.com"), ("hotmail.com", "outlook.office365.com"), ("outlook.com", "outlook.office365.com"), ("rambler.ru", "imap.rambler.ru"), ("btvm.ne.jp", "imap.btvm.ne.jp"), ("mars.dti.ne.jp", "imap.cm.dream.jp"), ("aurora.dti.ne.jp", "imap.cm.dream.jp"), ("naver.com", "imap.naver.com"), ("onet.pl", "imap.poczta.onet.pl"), ("gazeta.pl", "imap.gazeta.pl"), ("tim.it", "imap.tim.it"), ("alice.it", "in.alice.it"), ("gmx.com", "imap.gmx.com"), ("gmx.fr", "imap.gmx.com"), ("gmx.us", "imap.gmx.com"), ("gmx.ch", "imap.gmx.com"), ("gmx.pt", "imap.gmx.com"), ("gmx.sg", "imap.gmx.com"), ("gmx.net", "imap.gmx.net"), ("gmx.de", "imap.gmx.net"), ("gmx.at", "imap.gmx.at"), ("web.de", "imap.web.de"), ("inbox.lv", "mail.inbox.lv"), ("pissmail.com", "mail.pissmail.com"), ("incel.email", "mail.pissmail.com"), ("shitposting.expert", "mail.pissmail.com"), ("hatesje.ws", "mail.pissmail.com"), ("child.pizza", "mail.pissmail.com"), ("genocide.fun", "mail.pissmail.com"), ("dmc.chat", "mail.pissmail.com"), ("aol.com", "imap.aol.com"), # fallback AOL ] PROXY_TYPE_MAP = { "SOCKS5": socks.SOCKS5, "SOCKS4": socks.SOCKS4, "HTTP": socks.HTTP, } logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.addHandler(logging.StreamHandler(stream=sys.stdout)) # ────────────────────────────────────────────────────────────── # Modèles de données # ────────────────────────────────────────────────────────────── @dataclass class ProxyConfig: """Configuration du proxy.""" host: str port: int proxy_type: str = "SOCKS5" # "SOCKS5" | "SOCKS4" | "HTTP" username: Optional[str] = None password: Optional[str] = None @property def socks_type(self) -> int: t = self.proxy_type.upper() if t not in PROXY_TYPE_MAP: raise ValueError(f"proxy_type invalide : {self.proxy_type!r}. " f"Valeurs autorisées : {list(PROXY_TYPE_MAP)}") return PROXY_TYPE_MAP[t] def __repr__(self) -> str: auth = f"{self.username}:***@" if self.username else "" return f"{self.proxy_type}://{auth}{self.host}:{self.port}" @dataclass class MailAccount: """Compte email à lire.""" login: str password: str @dataclass class MailResult: """Résultat d'une lecture d'email.""" account: str subject: str from_address: str to_address: str body: str message_id: str = "" # Header Message-ID validation_url: str = "" # Première URL Hermes trouvée dans le corps # ────────────────────────────────────────────────────────────── # Connexion IMAP via proxy (bas niveau) # ────────────────────────────────────────────────────────────── class ProxyIMAP4_TLS(imaplib.IMAP4): """ Variante TLS de imaplib.IMAP4 qui route la connexion à travers un proxy SOCKS5/SOCKS4/HTTP grâce à PySocks. """ def __init__( self, host: str, port: int, ssl_context: Optional[ssl.SSLContext], proxy: ProxyConfig, timeout: Optional[float] = None, ): self._ssl_context = ssl_context self._proxy = proxy self._timeout = timeout # imaplib.IMAP4.__init__ appelle self.open() imaplib.IMAP4.__init__(self, host, port) self.file: io.BufferedReader def open(self, host: str = "", port: int = 993, timeout: Optional[float] = None) -> None: self.host = host self.port = port effective_timeout = timeout if timeout is not None else self._timeout # ── Créer la socket SOCKS ──────────────────────────── sock = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM) sock.set_proxy( proxy_type=self._proxy.socks_type, addr=self._proxy.host, port=self._proxy.port, username=self._proxy.username, password=self._proxy.password, ) if effective_timeout: sock.settimeout(effective_timeout) sock.connect((host, port)) # ── Envelopper avec SSL/TLS ────────────────────────── ctx = self._ssl_context or ssl.create_default_context() self.sock = ctx.wrap_socket(sock, server_hostname=host) self.file = self.sock.makefile("rb") # ── Méthodes requises par imaplib.IMAP4 ───────────────── def read(self, size: int) -> bytes: return self.file.read(size) # type: ignore[return-value] def readline(self) -> bytes: return self.file.readline() # type: ignore[return-value] def send(self, data) -> None: self.sock.sendall(data) def shutdown(self) -> None: imaplib.IMAP4.shutdown(self) # ────────────────────────────────────────────────────────────── # IMAPClient avec proxy # ────────────────────────────────────────────────────────────── class ProxyIMAPClient(IMAPClient): """ Sous-classe d'IMAPClient qui utilise un proxy SOCKS/HTTP. Usage : proxy = ProxyConfig(host="127.0.0.1", port=1080, proxy_type="SOCKS5") client = ProxyIMAPClient("imap.gmail.com", proxy=proxy, use_uid=True, subjects=["Confirmation", "Appointment"]) client.login("user@gmail.com", "password") Paramètres supplémentaires -------------------------- proxy : ProxyConfig Configuration du proxy SOCKS/HTTP. subjects : list[str], optional Sujets (ou sous-chaînes) à utiliser pour filtrer les emails. Accessibles via ``client.subjects``. Utilisés par ``search_by_subjects()`` pour construire automatiquement les critères IMAP SUBJECT. """ def __init__( self, host: str, proxy: ProxyConfig, subjects: Optional[List[str]] = None, **kwargs, ): self._proxy = proxy # Sujets à rechercher, injectables depuis l'extérieur self.subjects: List[str] = list(subjects) if subjects else [] super().__init__(host, **kwargs) def _create_IMAP4(self): """Remplace la méthode d'IMAPClient pour injecter ProxyIMAP4_TLS.""" if self.ssl: return ProxyIMAP4_TLS( host=self.host, port=self.port, ssl_context=self.ssl_context, proxy=self._proxy, timeout=getattr(self._timeout, "connect", None), ) # Connexion non-SSL à travers le proxy (rare, mais supporté) raise NotImplementedError( "Connexion IMAP non-SSL via proxy non implémentée. " "Utilisez ssl=True (port 993)." ) def search_by_subjects( self, since: Optional[datetime.datetime] = None, extra_criteria: Optional[List] = None, ) -> List[int]: base: List = ["SINCE", datetime.datetime.today()] if extra_criteria: base.extend(extra_criteria) if not self.subjects: return self.search(base) # Construire OR enchaîné : OR SUBJECT "A" (OR SUBJECT "B" SUBJECT "C") # IMAPClient accepte des listes imbriquées pour les OR def _build_or(subjects: List[str]) -> List: if len(subjects) == 1: return ["SUBJECT", subjects[0]] return ["OR", ["SUBJECT", subjects[0]], _build_or(subjects[1:])] subject_filter = _build_or(self.subjects) # Combiner avec les critères de base (AND implicite dans IMAP) criteria = base + subject_filter return self.search(criteria) # ────────────────────────────────────────────────────────────── # Fonctions utilitaires # ────────────────────────────────────────────────────────────── def get_imap_server(login: str) -> str: """Retourne le serveur IMAP correspondant au domaine du login.""" login_lower = login.lower() for domain, server in IMAP_SERVER_MAP: if domain in login_lower: return server return "imap.aol.com" # fallback def extract_body(email_message: Message) -> str: """Extrait le corps HTML ou texte d'un email.""" body = "" for part in email_message.walk(): content_type = part.get_content_type() try: if content_type == "text/html": payload = part.get_payload(decode=True) if payload: body += payload.decode("utf-8", errors="ignore") elif content_type == "text/plain": payload = part.get_payload() if payload: body += str(payload) except Exception as exc: logger.warning("Erreur extraction body : %s", exc) return body def _dedup_key(result: MailResult) -> tuple: """ Calcule une clé de déduplication pour un MailResult. Priorité : 1. URL de validation Hermes — unique par rendez-vous, 100 % fiable 2. Message-ID — unique par email selon RFC 5322 3. hash MD5 du corps complet — fallback contenu quand les deux champs précédents sont absents (ex : certains serveurs 163.com / Yahoo n'ajoutent pas de Message-ID et peuvent présenter le même email depuis plusieurs dossiers avec des corps légèrement différents en encodage — on normalise avant de hacher) """ # Normalisation avant hash : on retire les espaces/sauts de ligne # superflus pour absorber les différences mineures d'encodage normalized = re.sub(r"\s+", " ", result.body).strip() body_hash = hashlib.md5(normalized.encode("utf-8", errors="ignore")).hexdigest() return ("body", body_hash) # ────────────────────────────────────────────────────────────── # Lecteur principal # ────────────────────────────────────────────────────────────── class ProxyMailReader: """ Lit les emails d'un compte via IMAPClient en passant par un proxy. Paramètres ---------- account : MailAccount Identifiants du compte email. proxy : ProxyConfig Configuration du proxy. timeout : float, optional Timeout de connexion en secondes (défaut : 30 s). subjects : list[str], optional Liste de sujets (ou sous-chaînes) à rechercher dans les emails. Si None ou vide, on utilise les sujets Hermès par défaut (VALIDATION_URL_SUBJECT_FR et VALIDATION_URL_SUBJECT_EN). Les sujets fournis s'ajoutent aux critères par défaut (OR). from_addresses : list[str], optional Liste d'adresses expéditeur à accepter en complément. Si None ou vide, on conserve uniquement "no-reply@hermes.com". """ def __init__( self, account: MailAccount, proxy: ProxyConfig, timeout: float = 30.0, subjects: Optional[List[str]] = None, from_addresses: Optional[List[str]] = None, max_retries: int = 8, retry_delay: float = 2.0, ): self.account = account self.proxy = proxy self.timeout = timeout self.max_retries = max_retries self.retry_delay = retry_delay self._subjects = [] if subjects: self._subjects.extend(subjects) # Adresses expéditeur acceptées self._from_addresses: List[str] = ["no-reply@hermes.com"] if from_addresses: self._from_addresses.extend(from_addresses) # ── Connexion ──────────────────────────────────────────── def _connect(self) -> ProxyIMAPClient: imap_server = get_imap_server(self.account.login) last_exc: Optional[Exception] = None for attempt in range(1, self.max_retries + 1): logger.info( "[%s] Tentative %d/%d — Connexion via %s → %s:993", self.account.login, attempt, self.max_retries, self.proxy, imap_server, ) try: client = ProxyIMAPClient( host=imap_server, proxy=self.proxy, subjects=self._subjects, use_uid=True, ssl=True, timeout=self.timeout, ) client.login(self.account.login, self.account.password) logger.info( "[%s] Connecté (tentative %d). Sujets recherchés : %s", self.account.login, attempt, self._subjects, ) return client except Exception as exc: last_exc = exc logger.warning( "[%s] Échec connexion/login (tentative %d/%d) : %s", self.account.login, attempt, self.max_retries, exc, ) if attempt < self.max_retries: delay = self.retry_delay * (2 * (attempt - 1)) logger.info( "[%s] Nouvelle tentative dans %.1f s…", self.account.login, delay, ) time.sleep(delay) raise ConnectionError( f"[{self.account.login}] Impossible de se connecter après " f"{self.max_retries} tentative(s). Dernière erreur : {last_exc}" ) from last_exc # ── Lecture des dossiers ───────────────────────────────── def _list_folders(self, client: ProxyIMAPClient) -> List[str]: return [info[-1] for info in client.list_folders()] # ── Lecture des messages ───────────────────────────────── def _read_folder( self, client: ProxyIMAPClient, folder: str, since: Optional[datetime.datetime] = None, ) -> List[MailResult]: results: List[MailResult] = [] since = since or datetime.datetime.today() try: client.select_folder(folder) except Exception as exc: logger.warning("[%s] Impossible d'ouvrir '%s' : %s", self.account.login, folder, exc) return results try: messages = client.search(['SINCE', since]) except Exception as exc: logger.warning("[%s] Recherche échouée dans '%s' : %s", self.account.login, folder, exc) return results if not messages: return results print("uids {}".format(messages)) logger.info("[%s] %d message(s) dans '%s'", self.account.login, len(messages), folder) for uid, msg_data in client.fetch(messages, 'RFC822').items(): try: raw = msg_data.get(b'RFC822') or msg_data.get('RFC822') if raw is None: continue em = email.message_from_bytes(raw) from_address = em.get('FROM', '') subject = em.get('subject', '') to_addr = em.get('To', self.account.login) message_id = em.get('Message-ID', '').strip() body = "" for part in em.walk(): print(part.get_content_type()) if part.get_content_type() == "text/html": payload = part.get_payload(decode=True) if payload: body = body + payload.decode("utf-8", errors="ignore") elif part.get_content_type() == "text/plain": body = body + str(part.get_payload()) logger.info("mail is {} and subject is {}, body is {}".format( self.account.login, subject, body)) # Filtrer selon les sujets configurés if not self._subjects or any(s in subject for s in self._subjects): result = MailResult( account=self.account.login, subject=subject, from_address=from_address, to_address=to_addr, body=body, message_id=message_id, ) results.append(result) except Exception as error: print(error) print("error trying to read email_message for {}".format(self.account.login)) return results # ── Point d'entrée public ──────────────────────────────── def read( self, since: Optional[datetime.datetime] = None, skip_folders: Optional[List[str]] = None, ) -> List[MailResult]: """ Se connecte au serveur IMAP via le proxy et retourne la liste des emails de validation trouvés depuis `since` (aujourd'hui par défaut). Paramètres ---------- since : datetime, optional — date de début de recherche skip_folders : list[str], optional — dossiers à ignorer (défaut : ["Sent", "Drafts", "Trash", "Junk", "Spam", "[Gmail]/All Mail", "[Gmail]/Starred", "[Gmail]/Important"]) """ if skip_folders is None: skip_folders = [ "Sent", "Drafts", "Trash", "Junk", "Spam", # Dossiers Gmail qui dupliquent le contenu d'INBOX "[Gmail]/All Mail", "[Gmail]/Starred", "[Gmail]/Important", ] all_results: List[MailResult] = [] seen_message_ids: set = set() # déduplication inter-dossiers client = self._connect() try: folders = self._list_folders(client) logger.info("[%s] Dossiers : %s", self.account.login, folders) for folder in folders: if folder in skip_folders: logger.debug("[%s] Dossier ignoré : %s", self.account.login, folder) continue for result in self._read_folder(client, folder, since): dedup_key = _dedup_key(result) if dedup_key in seen_message_ids: logger.debug( "[%s] Doublon ignoré (clé=%s) dans '%s'", self.account.login, str(dedup_key)[:40], folder, ) continue seen_message_ids.add(dedup_key) all_results.append(result) finally: try: client.logout() except Exception: pass return all_results # ────────────────────────────────────────────────────────────── # Lecture parallèle de plusieurs comptes # ────────────────────────────────────────────────────────────── from concurrent.futures import ThreadPoolExecutor, as_completed def read_multiple_accounts( accounts: List[MailAccount], proxy: ProxyConfig, since: Optional[datetime.datetime] = None, max_workers: int = 10, timeout: float = 30.0, ) -> List[MailResult]: """ Lit plusieurs comptes email en parallèle via le même proxy. Retourne la liste consolidée de tous les MailResult trouvés. """ all_results: List[MailResult] = [] with ThreadPoolExecutor(max_workers=max_workers) as executor: future_map = { executor.submit( ProxyMailReader(acc, proxy, timeout).read, since ): acc.login for acc in accounts } for future in as_completed(future_map): login = future_map[future] try: results = future.result() logger.info("[%s] %d email(s) de validation récupéré(s).", login, len(results)) all_results.extend(results) except Exception as exc: logger.error("[%s] Erreur : %s", login, exc) return all_results # ────────────────────────────────────────────────────────────── # Point d'entrée — exemple d'utilisation # ────────────────────────────────────────────────────────────── if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)-8s %(message)s", datefmt="%H:%M:%S", ) # ── 1. Configurer le proxy ─────────────────────────────── proxy = ProxyConfig( host=os.environ.get("GMX_PROXY_HOST", ""), port=int(os.environ.get("GMX_PROXY_PORT", "443")), proxy_type=os.environ.get("GMX_PROXY_TYPE", "SOCKS5"), username=os.environ.get("GMX_PROXY_USERNAME"), password=os.environ.get("GMX_PROXY_PASSWORD"), ) # ── 2. Définir les comptes à lire ──────────────────────── accounts = [ MailAccount(login="birgitnaya@gmx.net", password="XEeUF3Y1yaO"), # MailAccount(login="user@gmail.com", password="apppassword"), # MailAccount(login="user@outlook.com", password="password"), ] # ── 3. Lancer la lecture ───────────────────────────────── results = read_multiple_accounts( accounts=accounts, proxy=proxy, since=datetime.datetime.today(), max_workers=5, timeout=30.0, ) # ── 4. Afficher les résultats ──────────────────────────── print(f"\n{'=' * 60}") print(f" {len(results)} email(s) de validation trouvé(s)") print(f"{'=' * 60}\n") for r in results: print(f" Compte : {r.account}") print(f" De : {r.from_address}") print(f" Sujet : {r.subject}") print(f" URLs : {r.validation_urls or 'aucune'}") print(f" {'-' * 56}")