Files
appointment_request/mail/imap_proxy_reader.py
T

659 lines
24 KiB
Python

"""
imap_proxy_reader.py
====================
Lire des emails via IMAPClient en passant par un proxy SOCKS5/SOCKS4/HTTP.
Fonctionnement :
- ProxyIMAP4_TLS : sous-classe de imaplib.IMAP4 qui ouvre la socket
à travers un proxy SOCKS via PySocks.
- ProxyIMAPClient : sous-classe de IMAPClient qui injecte ProxyIMAP4_TLS
au lieu de la connexion directe habituelle.
Dépendances :
pip install imapclient PySocks
"""
import datetime
import email
import imaplib
import io
import logging
import os
import re
import ssl
import socket
from dataclasses import dataclass, field
from email.message import Message
from typing import List, Optional, Tuple
import random
import socks
from dotenv import load_dotenv
from imapclient import IMAPClient
load_dotenv()
# ──────────────────────────────────────────────────────────────
# Constantes
# ──────────────────────────────────────────────────────────────
VALIDATION_URL_SUBJECT_FR = "Validation de votre demande de rendez-vous"
VALIDATION_URL_SUBJECT_EN = "Please confirm your appointment request"
VALIDATION_URL_REGEX = (
r"https:\/\/rendezvousparis\.hermes\.com"
r"\/client\/register\/[A-Z0-9]+\/validate[?.]code=[A-Z0-9]+"
)
DATE_FORMAT = "%d-%b-%Y"
# Correspondance domaine → serveur IMAP (identique à mail_constants.py)
IMAP_SERVER_MAP: List[Tuple[str, str]] = [
("163.com", "imap.163.com"),
("yahoo.com", "imap.mail.yahoo.com"),
("firemail.de", "imap.firemail.de"),
("gmail.com", "imap.gmail.com"),
("sina.com", "imap.sina.com"),
("hotmail.com", "outlook.office365.com"),
("outlook.com", "outlook.office365.com"),
("rambler.ru", "imap.rambler.ru"),
("btvm.ne.jp", "imap.btvm.ne.jp"),
("mars.dti.ne.jp", "imap.cm.dream.jp"),
("aurora.dti.ne.jp", "imap.cm.dream.jp"),
("naver.com", "imap.naver.com"),
("onet.pl", "imap.poczta.onet.pl"),
("gazeta.pl", "imap.gazeta.pl"),
("tim.it", "imap.tim.it"),
("alice.it", "in.alice.it"),
("gmx.com", "imap.gmx.com"),
("gmx.fr", "imap.gmx.com"),
("gmx.us", "imap.gmx.com"),
("gmx.ch", "imap.gmx.com"),
("gmx.pt", "imap.gmx.com"),
("gmx.sg", "imap.gmx.com"),
("gmx.net", "imap.gmx.net"),
("gmx.de", "imap.gmx.net"),
("gmx.at", "imap.gmx.at"),
("web.de", "imap.web.de"),
("inbox.lv", "mail.inbox.lv"),
("pissmail.com", "mail.pissmail.com"),
("incel.email", "mail.pissmail.com"),
("shitposting.expert","mail.pissmail.com"),
("hatesje.ws", "mail.pissmail.com"),
("child.pizza", "mail.pissmail.com"),
("genocide.fun", "mail.pissmail.com"),
("dmc.chat", "mail.pissmail.com"),
("aol.com", "imap.aol.com"), # fallback AOL
]
PROXY_TYPE_MAP = {
"SOCKS5": socks.SOCKS5,
"SOCKS4": socks.SOCKS4,
"HTTP": socks.HTTP,
}
logger = logging.getLogger(__name__)
# ──────────────────────────────────────────────────────────────
# Modèles de données
# ──────────────────────────────────────────────────────────────
@dataclass
class ProxyConfig:
"""Configuration du proxy."""
host: str
port: int
proxy_type: str = "SOCKS5" # "SOCKS5" | "SOCKS4" | "HTTP"
username: Optional[str] = None
password: Optional[str] = None
@property
def socks_type(self) -> int:
t = self.proxy_type.upper()
if t not in PROXY_TYPE_MAP:
raise ValueError(f"proxy_type invalide : {self.proxy_type!r}. "
f"Valeurs autorisées : {list(PROXY_TYPE_MAP)}")
return PROXY_TYPE_MAP[t]
def __repr__(self) -> str:
auth = f"{self.username}:***@" if self.username else ""
return f"{self.proxy_type}://{auth}{self.host}:{self.port}"
@dataclass
class MailAccount:
"""Compte email à lire."""
login: str
password: str
@dataclass
class MailResult:
"""Résultat d'une lecture d'email."""
account: str
subject: str
from_address: str
to_address: str
body: str
validation_urls: List[str] = field(default_factory=list)
# ──────────────────────────────────────────────────────────────
# Connexion IMAP via proxy (bas niveau)
# ──────────────────────────────────────────────────────────────
class ProxyIMAP4_TLS(imaplib.IMAP4):
"""
Variante TLS de imaplib.IMAP4 qui route la connexion
à travers un proxy SOCKS5/SOCKS4/HTTP grâce à PySocks.
"""
def __init__(
self,
host: str,
port: int,
ssl_context: Optional[ssl.SSLContext],
proxy: ProxyConfig,
timeout: Optional[float] = None,
):
self._ssl_context = ssl_context
self._proxy = proxy
self._timeout = timeout
# imaplib.IMAP4.__init__ appelle self.open()
imaplib.IMAP4.__init__(self, host, port)
self.file: io.BufferedReader
def open(self, host: str = "", port: int = 993, timeout: Optional[float] = None) -> None:
self.host = host
self.port = port
effective_timeout = timeout if timeout is not None else self._timeout
# ── Créer la socket SOCKS ────────────────────────────
sock = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM)
sock.set_proxy(
proxy_type=self._proxy.socks_type,
addr=self._proxy.host,
port=self._proxy.port,
username=self._proxy.username,
password=self._proxy.password,
)
if effective_timeout:
sock.settimeout(effective_timeout)
sock.connect((host, port))
# ── Envelopper avec SSL/TLS ──────────────────────────
ctx = self._ssl_context or ssl.create_default_context()
self.sock = ctx.wrap_socket(sock, server_hostname=host)
self.file = self.sock.makefile("rb")
# ── Méthodes requises par imaplib.IMAP4 ─────────────────
def read(self, size: int) -> bytes:
return self.file.read(size) # type: ignore[return-value]
def readline(self) -> bytes:
return self.file.readline() # type: ignore[return-value]
def send(self, data) -> None:
self.sock.sendall(data)
def shutdown(self) -> None:
imaplib.IMAP4.shutdown(self)
def id(self, parameters: dict) -> tuple:
"""
Envoie la commande IMAP ID (RFC 2971).
parameters : dict ex. {"name": "Thunderbird", "version": "115.0"}
Retourne le tuple brut (typ, data) renvoyé par le serveur.
"""
args = " ".join(
'"{}"'.format(str(v).replace('"', '\\"'))
for pair in parameters.items()
for v in pair
)
return self._simple_command("ID", "({})".format(args))
# ──────────────────────────────────────────────────────────────
# Profils de clients IMAP réels (pour spoofing du fingerprint)
# ──────────────────────────────────────────────────────────────
_IMAP_CLIENT_PROFILES = [
# Mozilla Thunderbird 115 (ESR) — Windows
{
"name": "Thunderbird",
"version": "115.9.0",
"vendor": "Mozilla",
"support-url": "https://support.mozilla.org/",
"command": "IMAP4rev1",
"os": "Windows NT 10.0",
"os-version": "10.0",
},
# Mozilla Thunderbird 115 — macOS
{
"name": "Thunderbird",
"version": "115.9.0",
"vendor": "Mozilla",
"support-url": "https://support.mozilla.org/",
"command": "IMAP4rev1",
"os": "macOS",
"os-version": "14.4",
},
# Apple Mail — macOS Sonoma
{
"name": "Mac OS X Mail",
"version": "16.0",
"vendor": "Apple Inc.",
"support-url": "https://support.apple.com/mail",
"os": "Mac OS X",
"os-version": "14.4",
},
# Apple Mail — iOS
{
"name": "iPhone Mail",
"version": "17.4",
"vendor": "Apple Inc.",
"os": "iOS",
"os-version": "17.4",
},
# Outlook pour Windows (MAPI/IMAP bridge)
{
"name": "Microsoft Outlook",
"version": "16.0.17531.20108",
"vendor": "Microsoft Corporation",
"support-url": "https://support.microsoft.com/outlook",
"os": "Windows NT 10.0",
"os-version": "10.0",
},
]
def _random_imap_id_params() -> dict:
"""Retourne un profil aléatoire parmi les clients IMAP réels."""
return random.choice(_IMAP_CLIENT_PROFILES)
def send_imap_id(imap, params: Optional[dict] = None) -> None:
"""
Envoie la commande IMAP ID après connexion pour usurper le fingerprint
client. Fonctionne avec IMAPClient (imapclient) et imaplib.IMAP4.
Paramètres
----------
imap : IMAPClient | imaplib.IMAP4
params : dict, optional — si None, un profil aléatoire est choisi.
"""
if params is None:
params = _random_imap_id_params()
try:
if isinstance(imap, IMAPClient):
# imapclient expose _imap (l'objet imaplib sous-jacent)
_raw = imap._imap
if hasattr(_raw, "id"):
_raw.id(params)
else:
# Fallback : commande brute via imapclient
args = " ".join(
'"{}"'.format(str(v).replace('"', '\\"'))
for pair in params.items()
for v in pair
)
imap._imap._simple_command("ID", "({})".format(args))
elif hasattr(imap, "id"):
# ProxyIMAP4_TLS ou tout imaplib.IMAP4 patchable
imap.id(params)
else:
# Dernier recours : commande brute imaplib
args = " ".join(
'"{}"'.format(str(v).replace('"', '\\"'))
for pair in params.items()
for v in pair
)
imap._simple_command("ID", "({})".format(args))
except Exception as exc:
logger.debug("IMAP ID non supporté ou ignoré : %s", exc)
class ProxyIMAPClient(IMAPClient):
"""
Sous-classe d'IMAPClient qui utilise un proxy SOCKS/HTTP.
Usage :
proxy = ProxyConfig(host="127.0.0.1", port=1080, proxy_type="SOCKS5")
client = ProxyIMAPClient("imap.gmail.com", proxy=proxy, use_uid=True)
client.login("user@gmail.com", "password")
"""
def __init__(self, host: str, proxy: ProxyConfig, **kwargs):
self._proxy = proxy
super().__init__(host, **kwargs)
def _create_IMAP4(self):
"""Remplace la méthode d'IMAPClient pour injecter ProxyIMAP4_TLS."""
if self.ssl:
# self._timeout peut être un float (secondes) ou un objet avec
# un attribut 'connect' (ex : urllib3 Timeout). On gère les deux.
_timeout = self._timeout
if _timeout is not None and not isinstance(_timeout, (int, float)):
_timeout = getattr(_timeout, "connect", None)
return ProxyIMAP4_TLS(
host=self.host,
port=self.port,
ssl_context=self.ssl_context,
proxy=self._proxy,
timeout=_timeout,
)
# Connexion non-SSL à travers le proxy (rare, mais supporté)
# On monkey-patch juste la connexion TCP
raise NotImplementedError(
"Connexion IMAP non-SSL via proxy non implémentée. "
"Utilisez ssl=True (port 993)."
)
def login(self, username: str, password: str):
"""Surcharge login() pour envoyer IMAP ID juste après l'authentification."""
result = super().login(username, password)
send_imap_id(self)
return result
# ──────────────────────────────────────────────────────────────
# Fonctions utilitaires
# ──────────────────────────────────────────────────────────────
def get_imap_server(login: str) -> str:
"""Retourne le serveur IMAP correspondant au domaine du login."""
login_lower = login.lower()
for domain, server in IMAP_SERVER_MAP:
if domain in login_lower:
return server
return "imap.aol.com" # fallback
def extract_body(email_message: Message) -> str:
"""Extrait le corps HTML ou texte d'un email."""
body = ""
for part in email_message.walk():
content_type = part.get_content_type()
try:
if content_type == "text/html":
payload = part.get_payload(decode=True)
if payload:
body += payload.decode("utf-8", errors="ignore")
elif content_type == "text/plain":
payload = part.get_payload()
if payload:
body += str(payload)
except Exception as exc:
logger.warning("Erreur extraction body : %s", exc)
return body
def find_validation_urls(text: str) -> List[str]:
"""Recherche toutes les URLs de validation Hermes dans un texte."""
return re.findall(VALIDATION_URL_REGEX, text)
# ──────────────────────────────────────────────────────────────
# Lecteur principal
# ──────────────────────────────────────────────────────────────
class ProxyMailReader:
"""
Lit les emails d'un compte via IMAPClient en passant par un proxy.
Paramètres
----------
account : MailAccount
Identifiants du compte email.
proxy : ProxyConfig
Configuration du proxy.
timeout : float, optional
Timeout de connexion en secondes (défaut : 30 s).
"""
def __init__(
self,
account: MailAccount,
proxy: ProxyConfig,
timeout: float = 30.0,
):
self.account = account
self.proxy = proxy
self.timeout = timeout
# ── Connexion ────────────────────────────────────────────
def _connect(self) -> ProxyIMAPClient:
imap_server = get_imap_server(self.account.login)
logger.info(
"[%s] Connexion via %s%s:993",
self.account.login, self.proxy, imap_server,
)
client = ProxyIMAPClient(
host=imap_server,
proxy=self.proxy,
use_uid=True,
ssl=True,
timeout=self.timeout,
)
client.login(self.account.login, self.account.password)
logger.info("[%s] Connecté.", self.account.login)
return client
# ── Lecture des dossiers ─────────────────────────────────
def _list_folders(self, client: ProxyIMAPClient) -> List[str]:
return [info[-1] for info in client.list_folders()]
# ── Lecture des messages ─────────────────────────────────
def _read_folder(
self,
client: ProxyIMAPClient,
folder: str,
since: Optional[datetime.datetime] = None,
) -> List[MailResult]:
results: List[MailResult] = []
since = since or datetime.datetime.today()
try:
client.select_folder(folder, readonly=True)
except Exception as exc:
logger.warning("[%s] Impossible d'ouvrir '%s' : %s",
self.account.login, folder, exc)
return results
try:
uids = client.search(["SINCE", since])
except Exception as exc:
logger.warning("[%s] Recherche échouée dans '%s' : %s",
self.account.login, folder, exc)
return results
if not uids:
return results
logger.info("[%s] %d message(s) dans '%s'",
self.account.login, len(uids), folder)
for uid, msg_data in client.fetch(uids, "RFC822").items():
try:
raw = msg_data.get(b"RFC822") or msg_data.get("RFC822")
if raw is None:
continue
em = email.message_from_bytes(raw)
subject = em.get("Subject", "")
from_addr = em.get("From", "")
to_addr = em.get("To", self.account.login)
# Filtrer : on ne garde que les emails de validation Hermes
is_validation = (
VALIDATION_URL_SUBJECT_FR in subject
or VALIDATION_URL_SUBJECT_EN in subject
or "no-reply@hermes.com" in from_addr.lower()
)
if not is_validation:
continue
body = extract_body(em)
urls = find_validation_urls(body)
result = MailResult(
account=self.account.login,
subject=subject,
from_address=from_addr,
to_address=to_addr,
body=body,
validation_urls=urls,
)
results.append(result)
logger.info(
"[%s] Email de validation trouvé (uid=%s) — URLs : %s",
self.account.login, uid, urls or "aucune",
)
except Exception as exc:
logger.warning(
"[%s] Erreur traitement uid=%s : %s",
self.account.login, uid, exc,
)
return results
# ── Point d'entrée public ────────────────────────────────
def read(
self,
since: Optional[datetime.datetime] = None,
skip_folders: Optional[List[str]] = None,
) -> List[MailResult]:
"""
Se connecte au serveur IMAP via le proxy et retourne la liste
des emails de validation trouvés depuis `since` (aujourd'hui par défaut).
Paramètres
----------
since : datetime, optional — date de début de recherche
skip_folders : list[str], optional — dossiers à ignorer
(défaut : ["Sent", "Drafts", "Trash", "Junk", "Spam"])
"""
if skip_folders is None:
skip_folders = ["Sent", "Drafts", "Trash", "Junk", "Spam"]
all_results: List[MailResult] = []
client = self._connect()
try:
folders = self._list_folders(client)
logger.info("[%s] Dossiers : %s", self.account.login, folders)
for folder in folders:
if folder in skip_folders:
logger.debug("[%s] Dossier ignoré : %s",
self.account.login, folder)
continue
all_results.extend(self._read_folder(client, folder, since))
finally:
try:
client.logout()
except Exception:
pass
return all_results
# ──────────────────────────────────────────────────────────────
# Lecture parallèle de plusieurs comptes
# ──────────────────────────────────────────────────────────────
from concurrent.futures import ThreadPoolExecutor, as_completed
def read_multiple_accounts(
accounts: List[MailAccount],
proxy: ProxyConfig,
since: Optional[datetime.datetime] = None,
max_workers: int = 10,
timeout: float = 30.0,
) -> List[MailResult]:
"""
Lit plusieurs comptes email en parallèle via le même proxy.
Retourne la liste consolidée de tous les MailResult trouvés.
"""
all_results: List[MailResult] = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_map = {
executor.submit(
ProxyMailReader(acc, proxy, timeout).read, since
): acc.login
for acc in accounts
}
for future in as_completed(future_map):
login = future_map[future]
try:
results = future.result()
logger.info("[%s] %d email(s) de validation récupéré(s).",
login, len(results))
all_results.extend(results)
except Exception as exc:
logger.error("[%s] Erreur : %s", login, exc)
return all_results
# ──────────────────────────────────────────────────────────────
# Point d'entrée — exemple d'utilisation
# ──────────────────────────────────────────────────────────────
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(message)s",
datefmt="%H:%M:%S",
)
# ── 1. Configurer le proxy ───────────────────────────────
proxy = ProxyConfig(
host=os.environ.get("GMX_PROXY_HOST", ""),
port=int(os.environ.get("GMX_PROXY_PORT", "443")),
proxy_type=os.environ.get("GMX_PROXY_TYPE", "SOCKS5"),
username=os.environ.get("GMX_PROXY_USERNAME"),
password=os.environ.get("GMX_PROXY_PASSWORD"),
)
# ── 2. Définir les comptes à lire ────────────────────────
accounts = [
MailAccount(login="birgitnaya@gmx.net", password="XEeUF3Y1yaO"),
# MailAccount(login="user@gmail.com", password="apppassword"),
# MailAccount(login="user@outlook.com", password="password"),
]
# ── 3. Lancer la lecture ─────────────────────────────────
results = read_multiple_accounts(
accounts=accounts,
proxy=proxy,
since=datetime.datetime.today(),
max_workers=5,
timeout=30.0,
)
# ── 4. Afficher les résultats ────────────────────────────
print(f"\n{'='*60}")
print(f" {len(results)} email(s) de validation trouvé(s)")
print(f"{'='*60}\n")
for r in results:
print(f" Compte : {r.account}")
print(f" De : {r.from_address}")
print(f" Sujet : {r.subject}")
print(f" URLs : {r.validation_urls or 'aucune'}")
print(f" {'-'*56}")