need to change read mail body

This commit is contained in:
2026-04-02 23:10:25 +02:00
parent e2c6483911
commit 40d479b2fc
+103 -56
View File
@@ -15,6 +15,7 @@ Dépendances :
import datetime
import email
import hashlib
import imaplib
import io
import logging
@@ -23,6 +24,7 @@ import re
import socket
import ssl
import sys
import time
from dataclasses import dataclass
from email.message import Message
from typing import List, Optional, Tuple
@@ -37,13 +39,6 @@ load_dotenv()
# Constantes
# ──────────────────────────────────────────────────────────────
VALIDATION_URL_SUBJECT_FR = "Validation de votre demande de rendez-vous"
VALIDATION_URL_SUBJECT_EN = "Please confirm your appointment request"
VALIDATION_URL_REGEX = (
r"https:\/\/rendezvousparis\.hermes\.com"
r"\/client\/register\/[A-Z0-9]+\/validate\.code=[A-Z0-9]+"
)
DATE_FORMAT = "%d-%b-%Y"
# Correspondance domaine → serveur IMAP (identique à mail_constants.py)
@@ -77,7 +72,7 @@ IMAP_SERVER_MAP: List[Tuple[str, str]] = [
("inbox.lv", "mail.inbox.lv"),
("pissmail.com", "mail.pissmail.com"),
("incel.email", "mail.pissmail.com"),
("shitposting.expert","mail.pissmail.com"),
("shitposting.expert", "mail.pissmail.com"),
("hatesje.ws", "mail.pissmail.com"),
("child.pizza", "mail.pissmail.com"),
("genocide.fun", "mail.pissmail.com"),
@@ -91,7 +86,6 @@ PROXY_TYPE_MAP = {
"HTTP": socks.HTTP,
}
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(stream=sys.stdout))
@@ -138,6 +132,8 @@ class MailResult:
from_address: str
to_address: str
body: str
message_id: str = "" # Header Message-ID
validation_url: str = "" # Première URL Hermes trouvée dans le corps
# ──────────────────────────────────────────────────────────────
@@ -260,31 +256,7 @@ class ProxyIMAPClient(IMAPClient):
since: Optional[datetime.datetime] = None,
extra_criteria: Optional[List] = None,
) -> List[int]:
"""
Recherche les UIDs des emails dont le sujet correspond à l'un
des sujets stockés dans ``self.subjects``.
Si ``self.subjects`` est vide, retourne tous les messages
depuis ``since`` (sans filtre par sujet).
Paramètres
----------
since : datetime, optional
Filtre SINCE (aujourd'hui par défaut).
extra_criteria : list, optional
Critères IMAP supplémentaires à combiner (AND implicite).
Retourne
--------
list[int] — UIDs correspondants (peut être vide).
Exemple
-------
client.subjects = ["Confirmation RDV", "confirmed"]
uids = client.search_by_subjects(since=datetime.datetime.today())
"""
since = since or datetime.datetime.today()
base: List = ["SINCE", since]
base: List = ["SINCE", datetime.datetime.today()]
if extra_criteria:
base.extend(extra_criteria)
@@ -336,9 +308,27 @@ def extract_body(email_message: Message) -> str:
return body
def find_validation_urls(text: str) -> List[str]:
"""Recherche toutes les URLs de validation Hermes dans un texte."""
return re.findall(VALIDATION_URL_REGEX, text)
def _dedup_key(result: MailResult) -> tuple:
"""
Calcule une clé de déduplication pour un MailResult.
Priorité :
1. URL de validation Hermes — unique par rendez-vous, 100 % fiable
2. Message-ID — unique par email selon RFC 5322
3. hash MD5 du corps complet — fallback contenu quand les deux
champs précédents sont absents
(ex : certains serveurs 163.com / Yahoo
n'ajoutent pas de Message-ID et peuvent
présenter le même email depuis plusieurs
dossiers avec des corps légèrement
différents en encodage — on normalise
avant de hacher)
"""
# Normalisation avant hash : on retire les espaces/sauts de ligne
# superflus pour absorber les différences mineures d'encodage
normalized = re.sub(r"\s+", " ", result.body).strip()
body_hash = hashlib.md5(normalized.encode("utf-8", errors="ignore")).hexdigest()
return ("body", body_hash)
# ──────────────────────────────────────────────────────────────
@@ -374,10 +364,14 @@ class ProxyMailReader:
timeout: float = 30.0,
subjects: Optional[List[str]] = None,
from_addresses: Optional[List[str]] = None,
max_retries: int = 8,
retry_delay: float = 2.0,
):
self.account = account
self.proxy = proxy
self.timeout = timeout
self.max_retries = max_retries
self.retry_delay = retry_delay
self._subjects = []
if subjects:
self._subjects.extend(subjects)
@@ -390,21 +384,47 @@ class ProxyMailReader:
def _connect(self) -> ProxyIMAPClient:
imap_server = get_imap_server(self.account.login)
last_exc: Optional[Exception] = None
for attempt in range(1, self.max_retries + 1):
logger.info(
"[%s] Connexion via %s%s:993",
self.account.login, self.proxy, imap_server,
"[%s] Tentative %d/%d Connexion via %s%s:993",
self.account.login, attempt, self.max_retries,
self.proxy, imap_server,
)
try:
client = ProxyIMAPClient(
host=imap_server,
proxy=self.proxy,
subjects=self._subjects, # propagation des sujets vers le client bas niveau
subjects=self._subjects,
use_uid=True,
ssl=True,
timeout=self.timeout,
)
client.login(self.account.login, self.account.password)
logger.info("[%s] Connecté. Sujets recherchés : %s", self.account.login, self._subjects)
logger.info(
"[%s] Connecté (tentative %d). Sujets recherchés : %s",
self.account.login, attempt, self._subjects,
)
return client
except Exception as exc:
last_exc = exc
logger.warning(
"[%s] Échec connexion/login (tentative %d/%d) : %s",
self.account.login, attempt, self.max_retries, exc,
)
if attempt < self.max_retries:
delay = self.retry_delay * (2 * (attempt - 1))
logger.info(
"[%s] Nouvelle tentative dans %.1f s…",
self.account.login, delay,
)
time.sleep(delay)
raise ConnectionError(
f"[{self.account.login}] Impossible de se connecter après "
f"{self.max_retries} tentative(s). Dernière erreur : {last_exc}"
) from last_exc
# ── Lecture des dossiers ─────────────────────────────────
@@ -421,6 +441,8 @@ class ProxyMailReader:
) -> List[MailResult]:
results: List[MailResult] = []
since = since or datetime.datetime.today()
# 用于去重:同一主题+发信人只读第一封
seen_subject_from: set = set()
try:
client.select_folder(folder, readonly=True)
@@ -439,7 +461,7 @@ class ProxyMailReader:
if not uids:
return results
print("uids {}".format(uids))
logger.info("[%s] %d message(s) dans '%s'",
self.account.login, len(uids), folder)
@@ -453,6 +475,19 @@ class ProxyMailReader:
subject = em.get("Subject", "")
from_addr = em.get("From", "")
to_addr = em.get("To", self.account.login)
message_id = em.get("Message-ID", "").strip()
print("subject {}".format(subject))
print("message_id {}".format(message_id))
# 去重:同一主题+发信人只读第一封
dedup_key = (subject, from_addr)
if dedup_key in seen_subject_from:
logger.debug(
"[%s] Doublon ignoré (même sujet et expéditeur) dans '%s': %s",
self.account.login, folder, subject[:50]
)
continue
seen_subject_from.add(dedup_key)
# Filtrer : on ne garde que les emails correspondant aux sujets/expéditeurs configurés
is_validation = (
@@ -468,13 +503,10 @@ class ProxyMailReader:
subject=subject,
from_address=from_addr,
to_address=to_addr,
body=body
body=body,
message_id=message_id
)
results.append(result)
logger.info(
"[%s] Email trouvé (uid=%s) — URLs : %s",
self.account.login, uid
)
except Exception as exc:
logger.warning(
"[%s] Erreur traitement uid=%s : %s",
@@ -498,12 +530,19 @@ class ProxyMailReader:
----------
since : datetime, optional — date de début de recherche
skip_folders : list[str], optional — dossiers à ignorer
(défaut : ["Sent", "Drafts", "Trash", "Junk", "Spam"])
(défaut : ["Sent", "Drafts", "Trash", "Junk", "Spam",
"[Gmail]/All Mail", "[Gmail]/Starred",
"[Gmail]/Important"])
"""
if skip_folders is None:
skip_folders = ["Sent", "Drafts", "Trash", "Junk", "Spam"]
skip_folders = [
"Sent", "Drafts", "Trash", "Junk", "Spam",
# Dossiers Gmail qui dupliquent le contenu d'INBOX
"[Gmail]/All Mail", "[Gmail]/Starred", "[Gmail]/Important",
]
all_results: List[MailResult] = []
seen_message_ids: set = set() # déduplication inter-dossiers
client = self._connect()
try:
@@ -515,7 +554,18 @@ class ProxyMailReader:
logger.debug("[%s] Dossier ignoré : %s",
self.account.login, folder)
continue
all_results.extend(self._read_folder(client, folder, since))
for result in self._read_folder(client, folder, since):
dedup_key = _dedup_key(result)
if dedup_key in seen_message_ids:
logger.debug(
"[%s] Doublon ignoré (clé=%s) dans '%s'",
self.account.login, str(dedup_key)[:40], folder,
)
continue
seen_message_ids.add(dedup_key)
all_results.append(result)
finally:
try:
client.logout()
@@ -603,16 +653,13 @@ if __name__ == "__main__":
)
# ── 4. Afficher les résultats ────────────────────────────
print(f"\n{'='*60}")
print(f"\n{'=' * 60}")
print(f" {len(results)} email(s) de validation trouvé(s)")
print(f"{'='*60}\n")
print(f"{'=' * 60}\n")
for r in results:
print(f" Compte : {r.account}")
print(f" De : {r.from_address}")
print(f" Sujet : {r.subject}")
print(f" URLs : {r.validation_urls or 'aucune'}")
print(f" {'-'*56}")
print(f" {'-' * 56}")