Compare commits

...

12 Commits

Author SHA1 Message Date
panleicim 44c97bcd06 read only received from no-reply@hermes.com mails 2026-04-25 11:31:05 +02:00
panleicim 3a3a36082b improvement while reading mails 2026-04-24 18:20:41 +02:00
panleicim 64e47e05e7 add AGENTS.md 2026-04-23 22:26:54 +02:00
panleicim 6b05eb38d7 exclude proxy for inbox.lv 2026-04-23 22:20:22 +02:00
panleicim 7020866e92 fix for migration cached data 2026-04-17 00:30:00 +02:00
panleicim a2bb4caa74 exclude proxy for inbox.lv 2026-04-16 11:17:39 +02:00
panleicim d45d6f4d7d proxy for inbox.lv 2026-04-09 08:02:23 +02:00
panleicim c84266f8fd add timeout for reading mails 2026-04-04 11:28:49 +02:00
panleicim 1fa29ebd37 add deps to requirements.txt 2026-04-02 23:32:57 +02:00
panleicim ea8673540c print failed gmx mails 2026-03-28 12:13:06 +01:00
panleicim 228c0b9bbb use proxy while reading gmx mails 2026-03-28 11:01:08 +01:00
Lei PAN b224c75ad0 password in env 2026-03-06 19:55:25 +01:00
11 changed files with 1437 additions and 370 deletions
+63
View File
@@ -0,0 +1,63 @@
# AGENTS.md
## Project
Python bot automating appointment requests (French immigration/visa). Multi-threaded HTTP workers + MongoDB tracking + RabbitMQ queuing. FastAPI wrapper (`api/`) provides HTTP/WebSocket control plane.
## Install
```bash
pip install -r requirements.txt # main bot
pip install pymongo # NOT in requirements.txt but required by db/mongo_manager.py
pip install -r api_requirements.txt # FastAPI wrapper only
```
## Run Commands
```bash
python scheduler.py # production cron (MonSat 10:30 Europe/Paris)
python scheduler_test.py # test variant (MonSun)
python request_sender_test.py # manual run — edit hardcoded `file_list` at bottom first
python scheduler_link_validator.py
uvicorn api.main:app --host 0.0.0.0 --port 8000 --reload # FastAPI server
```
## No Tests / No Lint / No Typecheck
`*_test.py` files are **standalone scripts**, not pytest tests. Run them with `python`, not `pytest`. There is no test framework, no linter, no formatter, no CI configured.
## Key Quirks
- **`utiles.py` (intentional typo)** — time utility module. Imported as `from utiles import is_time_between`. Do not rename.
- **`requirements.txt` incomplete** — `pymongo` missing; must be installed separately.
- **`venv/` is committed** — do not delete.
- **`.env` file required** for FastAPI server: `API_KEY`, `API_HOST`, `API_PORT`, `LOG_LEVEL`, `ALLOWED_ORIGINS`, `DEFAULT_THREAD_NUMBER`, `DEFAULT_STOP_HOUR`, `DEFAULT_STOP_MINS`.
- **MongoDB env vars** — `MONGO_USERNAME` and `MONGO_PASSWORD` must be set; `db/mongo_manager.py` raises `ValueError` if missing. Hardcoded host: `mongo2.lpaconsulting.fr`.
- **RabbitMQ credentials** hardcoded in `queue_message/CookiesPublisher.py`. Host: `appointment.lpaconsulting.fr:5672`.
- **`curl_cffi`** used for HTTP (not `requests`/`httpx`) to bypass TLS fingerprinting / bot detection.
- **Module-level side effects** — importing `request_sender.py` calls `init_logger()` at import time (no `__main__` guard).
## External Service Dependencies
Bot will fail without network access to:
- MongoDB at `mongo2.lpaconsulting.fr` (needs env vars)
- RabbitMQ at `appointment.lpaconsulting.fr:5672`
- Proxy servers defined in `workers/proxies_constants.py` and `workers/proxy_constants.py`
## Structure
```
api/ FastAPI control plane (start/stop, log streaming)
captcha/ CAPTCHA solver wrapper
db/ MongoDB singleton (MONGO_STORE_MANAGER)
mail/ IMAP helpers
models/ Data POJOs (ContactPojo, LinkPojo, etc.)
proxy_manager/ Proxy list management
queue_message/ RabbitMQ publisher/consumer (pika)
utils/ Misc helpers (logging, cookies, JS data)
workers/ Core HTTP workers (sender, link validator, cookie generator)
excel_reader.py Reads .xlsx contact lists via openpyxl
utiles.py Time utility (note spelling)
scheduler.py Production APScheduler entry point
```
+119 -32
View File
@@ -1,6 +1,8 @@
import datetime import datetime
import logging import logging
import time import time
import os
from typing import Optional
from pymongo import MongoClient from pymongo import MongoClient
@@ -21,11 +23,27 @@ DESTINATION_EMAIL_LIST = "DESTINATION_EMAIL_LIST"
LINKS_TO_VALIDATE = "LINKS_TO_VALIDATE" LINKS_TO_VALIDATE = "LINKS_TO_VALIDATE"
INVALID_EMAIL_LIST = "INVALID_EMAIL_LIST" INVALID_EMAIL_LIST = "INVALID_EMAIL_LIST"
CONTACT_LIST_SERIAL_MAP = "CONTACT_LIST_SERIAL_MAP" CONTACT_LIST_SERIAL_MAP = "CONTACT_LIST_SERIAL_MAP"
MAIL_READ_LOG = "MAIL_READ_LOG" # 记录每个邮箱上次读取时间
class MongoDbManager: class MongoDbManager:
def __init__(self): def __init__(self):
client = MongoClient(MONGO_DB_URL, username='appointment', password='Rdv@20222021', authSource='appointment') # Get username and password from environment variables
mongo_username = os.getenv("MONGO_USERNAME")
mongo_password = os.getenv("MONGO_PASSWORD")
# Validate that environment variables exist
if not mongo_username or not mongo_password:
raise ValueError(
"MONGO_USERNAME and MONGO_PASSWORD environment variables must be set"
)
client = MongoClient(
MONGO_DB_URL,
username=mongo_username,
password=mongo_password,
authSource="appointment",
)
self.db = client.appointment self.db = client.appointment
self.logger = logging.getLogger("mongoDb") self.logger = logging.getLogger("mongoDb")
@@ -36,8 +54,13 @@ class MongoDbManager:
def insert_reserve_result(self, collection_name, reserve: ReserveResultPojo): def insert_reserve_result(self, collection_name, reserve: ReserveResultPojo):
try: try:
collection_to_use = self.db[collection_name] collection_to_use = self.db[collection_name]
collection_to_use.replace_one(filter={'_id': reserve.id, }, replacement=reserve.to_firestore_dict(), collection_to_use.replace_one(
upsert=True) filter={
"_id": reserve.id,
},
replacement=reserve.to_firestore_dict(),
upsert=True,
)
except Exception as Error: except Exception as Error:
self.logger.info(Error) self.logger.info(Error)
@@ -83,8 +106,14 @@ class MongoDbManager:
result_list.append(ContactPojo.from_firestore_dict(document)) result_list.append(ContactPojo.from_firestore_dict(document))
return result_list return result_list
def save_links_to_validate(self, link: str, mail_address: str, model: str, def save_links_to_validate(
_all_contact_list: list, _used_ip: str = ""): self,
link: str,
mail_address: str,
model: str,
_all_contact_list: list,
_used_ip: str = "",
):
collection_to_use = self.db[LINKS_TO_VALIDATE] collection_to_use = self.db[LINKS_TO_VALIDATE]
updated_at = time.strftime("%H:%M:%S", time.localtime()) updated_at = time.strftime("%H:%M:%S", time.localtime())
_ip_country = "FR" _ip_country = "FR"
@@ -95,32 +124,42 @@ class MongoDbManager:
_ip_country = _contact.ip_country _ip_country = _contact.ip_country
if len(mail_address) > 0: if len(mail_address) > 0:
collection_to_use.replace_one(filter={'_id': mail_address, }, replacement={ collection_to_use.replace_one(
u'url': link, filter={
u'email': mail_address, "_id": mail_address,
u'serial': serial,
u'model': model,
u'ip_country': _ip_country,
u'_used_ip': _used_ip,
"updated_at": updated_at
}, },
upsert=True) replacement={
"url": link,
"email": mail_address,
"serial": serial,
"model": model,
"ip_country": _ip_country,
"_used_ip": _used_ip,
"updated_at": updated_at,
},
upsert=True,
)
else: else:
collection_to_use.replace_one(filter={'_id': link, }, replacement={ collection_to_use.replace_one(
u'url': link, filter={
u'serial': serial, "_id": link,
u'model': model,
u'ip_country': _ip_country,
u'_used_ip': _used_ip,
"updated_at": updated_at
}, },
upsert=True) replacement={
"url": link,
"serial": serial,
"model": model,
"ip_country": _ip_country,
"_used_ip": _used_ip,
"updated_at": updated_at,
},
upsert=True,
)
def get_code_for_email(self, email: str): def get_code_for_email(self, email: str):
collection_name = DESTINATION_EMAIL_LIST collection_name = DESTINATION_EMAIL_LIST
try: try:
collection_to_use = self.db[collection_name] collection_to_use = self.db[collection_name]
mailDocument = collection_to_use.find_one(filter={'_id': email}) mailDocument = collection_to_use.find_one(filter={"_id": email})
if mailDocument is not None: if mailDocument is not None:
return MailAddress.from_firestore_dict(mailDocument).password return MailAddress.from_firestore_dict(mailDocument).password
else: else:
@@ -134,7 +173,9 @@ class MongoDbManager:
_cursor = self.db[_collection_name] _cursor = self.db[_collection_name]
registered_user_list = [] registered_user_list = []
for document in _cursor.find(): for document in _cursor.find():
registered_user_list.append(RegisteredUserPojo.from_firestore_dict(document)) registered_user_list.append(
RegisteredUserPojo.from_firestore_dict(document)
)
return registered_user_list return registered_user_list
def get_destination_emails(self) -> list: def get_destination_emails(self) -> list:
@@ -167,8 +208,18 @@ class MongoDbManager:
self.logger.info(error) self.logger.info(error)
return link_list return link_list
def link_validated_for_result(self, link: str, linkPojo: LinkPojo, state=True, is_duplicated=False, def link_validated_for_result(
is_invalid=False, segement_position=1, ua="", model="", timestamp_in_s: list = None): self,
link: str,
linkPojo: LinkPojo,
state=True,
is_duplicated=False,
is_invalid=False,
segement_position=1,
ua="",
model="",
timestamp_in_s: list = None,
):
if timestamp_in_s is None: if timestamp_in_s is None:
timestamp_in_s = [] timestamp_in_s = []
print("link_validated_for_result() called with url = " + link) print("link_validated_for_result() called with url = " + link)
@@ -181,7 +232,10 @@ class MongoDbManager:
print("link_validated_for_result() called with id = " + _id) print("link_validated_for_result() called with id = " + _id)
collection_name = str(datetime.date.today()) collection_name = str(datetime.date.today())
print("link_validated_for_result() called with collection_name = " + collection_name) print(
"link_validated_for_result() called with collection_name = "
+ collection_name
)
collection = self.db[collection_name] collection = self.db[collection_name]
validated_at = time.strftime("%H:%M:%S", time.localtime()) validated_at = time.strftime("%H:%M:%S", time.localtime())
@@ -190,18 +244,51 @@ class MongoDbManager:
validated_by = "Invalid" validated_by = "Invalid"
if is_duplicated: if is_duplicated:
validated_by = "Double" validated_by = "Double"
collection.find_one_and_update({'_id': _id}, { collection.find_one_and_update(
"$set": {"url_validated": state, "validated_at": validated_at, "id": _id, "email": linkPojo.email, {"_id": _id},
{
"$set": {
"url_validated": state,
"validated_at": validated_at,
"id": _id,
"email": linkPojo.email,
"url": link, "url": link,
"validated_by_model": model, "validated_by_model": model,
"serial": linkPojo.serial, "serial": linkPojo.serial,
"validated_by_ua": ua, "validated_by_ua": ua,
"timestamp_in_s": "-".join(str(x) for x in timestamp_in_s), "timestamp_in_s": "-".join(str(x) for x in timestamp_in_s),
"validated_by": validated_by}}, "validated_by": validated_by,
upsert=True) }
},
upsert=True,
)
# remove the link from db # remove the link from db
collection_to_use = self.db[LINKS_TO_VALIDATE] collection_to_use = self.db[LINKS_TO_VALIDATE]
collection_to_use.delete_one({'_id': linkPojo.email}) collection_to_use.delete_one({"_id": linkPojo.email})
# ── Mail read-time tracking ────────────────────────────────────
def get_last_mail_read_time(self, mail: str) -> Optional[datetime.datetime]:
"""返回指定邮箱上次被读取的 UTC 时间,若从未读取则返回 None。"""
try:
doc = self.db[MAIL_READ_LOG].find_one({"_id": mail})
if doc and "last_read_at" in doc:
return doc["last_read_at"]
except Exception as err:
self.logger.warning("get_last_mail_read_time error: %s", err)
return None
def update_mail_read_time(self, mail: str) -> None:
"""将指定邮箱的上次读取时间更新为当前 UTC 时间。"""
try:
self.db[MAIL_READ_LOG].replace_one(
{"_id": mail},
{"_id": mail, "last_read_at": datetime.datetime.utcnow()},
upsert=True,
)
except Exception as err:
self.logger.warning("update_mail_read_time error: %s", err)
MONGO_STORE_MANAGER = MongoDbManager() MONGO_STORE_MANAGER = MongoDbManager()
+66
View File
@@ -0,0 +1,66 @@
"""
mail/
=====
Package de lecture IMAP pour le bot appointment_request.
Architecture (du plus bas au plus haut niveau) :
imap_proxy_reader — bibliothèque de bas niveau, sans dépendances internes
ProxyConfig dataclass de configuration proxy (SOCKS5/SOCKS4/HTTP)
ProxyIMAPClient IMAPClient passant par un proxy
get_imap_server(login) résolution domaine → serveur IMAP
extract_body(msg) extraction HTML/texte d'un email
send_imap_id(imap) spoofing fingerprint client (RFC 2971)
VALIDATION_URL_* constantes Hermes (source de vérité)
IMAP_SERVER_MAP table domaine → serveur
mail_constants IMAPClient avec fingerprint + fabrique create_imap()
FingerprintIMAPClient IMAPClient auto-ID après login
create_imap(login) fabrique → FingerprintIMAPClient sur le bon serveur
show_folders(imap) liste des dossiers (IMAPClient ou imaplib)
mail_reader_all_contacts logique métier de haut niveau
MailReader lit les emails d'un compte (direct ou proxy)
find_links_to_validate_from_mail_list() point d'entrée principal
"""
from mail.imap_proxy_reader import (
ProxyConfig,
ProxyIMAPClient,
get_imap_server,
extract_body,
send_imap_id,
VALIDATION_URL_SUBJECT_FR,
VALIDATION_URL_SUBJECT_EN,
VALIDATION_URL_REGEX,
IMAP_SERVER_MAP,
)
from mail.mail_constants import (
FingerprintIMAPClient,
create_imap,
show_folders,
)
from mail.mail_reader_all_contacts import (
MailReader,
find_links_to_validate_from_mail_list,
)
__all__ = [
# imap_proxy_reader
"ProxyConfig",
"ProxyIMAPClient",
"get_imap_server",
"extract_body",
"send_imap_id",
"VALIDATION_URL_SUBJECT_FR",
"VALIDATION_URL_SUBJECT_EN",
"VALIDATION_URL_REGEX",
"IMAP_SERVER_MAP",
# mail_constants
"FingerprintIMAPClient",
"create_imap",
"show_folders",
# mail_reader_all_contacts
"MailReader",
"find_links_to_validate_from_mail_list",
]
+658
View File
@@ -0,0 +1,658 @@
"""
imap_proxy_reader.py
====================
Lire des emails via IMAPClient en passant par un proxy SOCKS5/SOCKS4/HTTP.
Fonctionnement :
- ProxyIMAP4_TLS : sous-classe de imaplib.IMAP4 qui ouvre la socket
à travers un proxy SOCKS via PySocks.
- ProxyIMAPClient : sous-classe de IMAPClient qui injecte ProxyIMAP4_TLS
au lieu de la connexion directe habituelle.
Dépendances :
pip install imapclient PySocks
"""
import datetime
import email
import imaplib
import io
import logging
import os
import re
import ssl
import socket
from dataclasses import dataclass, field
from email.message import Message
from typing import List, Optional, Tuple
import random
import socks
from dotenv import load_dotenv
from imapclient import IMAPClient
load_dotenv()
# ──────────────────────────────────────────────────────────────
# Constantes
# ──────────────────────────────────────────────────────────────
VALIDATION_URL_SUBJECT_FR = "Validation de votre demande de rendez-vous"
VALIDATION_URL_SUBJECT_EN = "Please confirm your appointment request"
VALIDATION_URL_REGEX = (
r"https:\/\/rendezvousparis\.hermes\.com"
r"\/client\/register\/[A-Z0-9]+\/validate[?.]code=[A-Z0-9]+"
)
DATE_FORMAT = "%d-%b-%Y"
# Correspondance domaine → serveur IMAP (identique à mail_constants.py)
IMAP_SERVER_MAP: List[Tuple[str, str]] = [
("163.com", "imap.163.com"),
("yahoo.com", "imap.mail.yahoo.com"),
("firemail.de", "imap.firemail.de"),
("gmail.com", "imap.gmail.com"),
("sina.com", "imap.sina.com"),
("hotmail.com", "outlook.office365.com"),
("outlook.com", "outlook.office365.com"),
("rambler.ru", "imap.rambler.ru"),
("btvm.ne.jp", "imap.btvm.ne.jp"),
("mars.dti.ne.jp", "imap.cm.dream.jp"),
("aurora.dti.ne.jp", "imap.cm.dream.jp"),
("naver.com", "imap.naver.com"),
("onet.pl", "imap.poczta.onet.pl"),
("gazeta.pl", "imap.gazeta.pl"),
("tim.it", "imap.tim.it"),
("alice.it", "in.alice.it"),
("gmx.com", "imap.gmx.com"),
("gmx.fr", "imap.gmx.com"),
("gmx.us", "imap.gmx.com"),
("gmx.ch", "imap.gmx.com"),
("gmx.pt", "imap.gmx.com"),
("gmx.sg", "imap.gmx.com"),
("gmx.net", "imap.gmx.net"),
("gmx.de", "imap.gmx.net"),
("gmx.at", "imap.gmx.at"),
("web.de", "imap.web.de"),
("inbox.lv", "mail.inbox.lv"),
("pissmail.com", "mail.pissmail.com"),
("incel.email", "mail.pissmail.com"),
("shitposting.expert","mail.pissmail.com"),
("hatesje.ws", "mail.pissmail.com"),
("child.pizza", "mail.pissmail.com"),
("genocide.fun", "mail.pissmail.com"),
("dmc.chat", "mail.pissmail.com"),
("aol.com", "imap.aol.com"), # fallback AOL
]
PROXY_TYPE_MAP = {
"SOCKS5": socks.SOCKS5,
"SOCKS4": socks.SOCKS4,
"HTTP": socks.HTTP,
}
logger = logging.getLogger(__name__)
# ──────────────────────────────────────────────────────────────
# Modèles de données
# ──────────────────────────────────────────────────────────────
@dataclass
class ProxyConfig:
"""Configuration du proxy."""
host: str
port: int
proxy_type: str = "SOCKS5" # "SOCKS5" | "SOCKS4" | "HTTP"
username: Optional[str] = None
password: Optional[str] = None
@property
def socks_type(self) -> int:
t = self.proxy_type.upper()
if t not in PROXY_TYPE_MAP:
raise ValueError(f"proxy_type invalide : {self.proxy_type!r}. "
f"Valeurs autorisées : {list(PROXY_TYPE_MAP)}")
return PROXY_TYPE_MAP[t]
def __repr__(self) -> str:
auth = f"{self.username}:***@" if self.username else ""
return f"{self.proxy_type}://{auth}{self.host}:{self.port}"
@dataclass
class MailAccount:
"""Compte email à lire."""
login: str
password: str
@dataclass
class MailResult:
"""Résultat d'une lecture d'email."""
account: str
subject: str
from_address: str
to_address: str
body: str
validation_urls: List[str] = field(default_factory=list)
# ──────────────────────────────────────────────────────────────
# Connexion IMAP via proxy (bas niveau)
# ──────────────────────────────────────────────────────────────
class ProxyIMAP4_TLS(imaplib.IMAP4):
"""
Variante TLS de imaplib.IMAP4 qui route la connexion
à travers un proxy SOCKS5/SOCKS4/HTTP grâce à PySocks.
"""
def __init__(
self,
host: str,
port: int,
ssl_context: Optional[ssl.SSLContext],
proxy: ProxyConfig,
timeout: Optional[float] = None,
):
self._ssl_context = ssl_context
self._proxy = proxy
self._timeout = timeout
# imaplib.IMAP4.__init__ appelle self.open()
imaplib.IMAP4.__init__(self, host, port)
self.file: io.BufferedReader
def open(self, host: str = "", port: int = 993, timeout: Optional[float] = None) -> None:
self.host = host
self.port = port
effective_timeout = timeout if timeout is not None else self._timeout
# ── Créer la socket SOCKS ────────────────────────────
sock = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM)
sock.set_proxy(
proxy_type=self._proxy.socks_type,
addr=self._proxy.host,
port=self._proxy.port,
username=self._proxy.username,
password=self._proxy.password,
)
if effective_timeout:
sock.settimeout(effective_timeout)
sock.connect((host, port))
# ── Envelopper avec SSL/TLS ──────────────────────────
ctx = self._ssl_context or ssl.create_default_context()
self.sock = ctx.wrap_socket(sock, server_hostname=host)
self.file = self.sock.makefile("rb")
# ── Méthodes requises par imaplib.IMAP4 ─────────────────
def read(self, size: int) -> bytes:
return self.file.read(size) # type: ignore[return-value]
def readline(self) -> bytes:
return self.file.readline() # type: ignore[return-value]
def send(self, data) -> None:
self.sock.sendall(data)
def shutdown(self) -> None:
imaplib.IMAP4.shutdown(self)
def id(self, parameters: dict) -> tuple:
"""
Envoie la commande IMAP ID (RFC 2971).
parameters : dict ex. {"name": "Thunderbird", "version": "115.0"}
Retourne le tuple brut (typ, data) renvoyé par le serveur.
"""
args = " ".join(
'"{}"'.format(str(v).replace('"', '\\"'))
for pair in parameters.items()
for v in pair
)
return self._simple_command("ID", "({})".format(args))
# ──────────────────────────────────────────────────────────────
# Profils de clients IMAP réels (pour spoofing du fingerprint)
# ──────────────────────────────────────────────────────────────
_IMAP_CLIENT_PROFILES = [
# Mozilla Thunderbird 115 (ESR) — Windows
{
"name": "Thunderbird",
"version": "115.9.0",
"vendor": "Mozilla",
"support-url": "https://support.mozilla.org/",
"command": "IMAP4rev1",
"os": "Windows NT 10.0",
"os-version": "10.0",
},
# Mozilla Thunderbird 115 — macOS
{
"name": "Thunderbird",
"version": "115.9.0",
"vendor": "Mozilla",
"support-url": "https://support.mozilla.org/",
"command": "IMAP4rev1",
"os": "macOS",
"os-version": "14.4",
},
# Apple Mail — macOS Sonoma
{
"name": "Mac OS X Mail",
"version": "16.0",
"vendor": "Apple Inc.",
"support-url": "https://support.apple.com/mail",
"os": "Mac OS X",
"os-version": "14.4",
},
# Apple Mail — iOS
{
"name": "iPhone Mail",
"version": "17.4",
"vendor": "Apple Inc.",
"os": "iOS",
"os-version": "17.4",
},
# Outlook pour Windows (MAPI/IMAP bridge)
{
"name": "Microsoft Outlook",
"version": "16.0.17531.20108",
"vendor": "Microsoft Corporation",
"support-url": "https://support.microsoft.com/outlook",
"os": "Windows NT 10.0",
"os-version": "10.0",
},
]
def _random_imap_id_params() -> dict:
"""Retourne un profil aléatoire parmi les clients IMAP réels."""
return random.choice(_IMAP_CLIENT_PROFILES)
def send_imap_id(imap, params: Optional[dict] = None) -> None:
"""
Envoie la commande IMAP ID après connexion pour usurper le fingerprint
client. Fonctionne avec IMAPClient (imapclient) et imaplib.IMAP4.
Paramètres
----------
imap : IMAPClient | imaplib.IMAP4
params : dict, optional — si None, un profil aléatoire est choisi.
"""
if params is None:
params = _random_imap_id_params()
try:
if isinstance(imap, IMAPClient):
# imapclient expose _imap (l'objet imaplib sous-jacent)
_raw = imap._imap
if hasattr(_raw, "id"):
_raw.id(params)
else:
# Fallback : commande brute via imapclient
args = " ".join(
'"{}"'.format(str(v).replace('"', '\\"'))
for pair in params.items()
for v in pair
)
imap._imap._simple_command("ID", "({})".format(args))
elif hasattr(imap, "id"):
# ProxyIMAP4_TLS ou tout imaplib.IMAP4 patchable
imap.id(params)
else:
# Dernier recours : commande brute imaplib
args = " ".join(
'"{}"'.format(str(v).replace('"', '\\"'))
for pair in params.items()
for v in pair
)
imap._simple_command("ID", "({})".format(args))
except Exception as exc:
logger.debug("IMAP ID non supporté ou ignoré : %s", exc)
class ProxyIMAPClient(IMAPClient):
"""
Sous-classe d'IMAPClient qui utilise un proxy SOCKS/HTTP.
Usage :
proxy = ProxyConfig(host="127.0.0.1", port=1080, proxy_type="SOCKS5")
client = ProxyIMAPClient("imap.gmail.com", proxy=proxy, use_uid=True)
client.login("user@gmail.com", "password")
"""
def __init__(self, host: str, proxy: ProxyConfig, **kwargs):
self._proxy = proxy
super().__init__(host, **kwargs)
def _create_IMAP4(self):
"""Remplace la méthode d'IMAPClient pour injecter ProxyIMAP4_TLS."""
if self.ssl:
# self._timeout peut être un float (secondes) ou un objet avec
# un attribut 'connect' (ex : urllib3 Timeout). On gère les deux.
_timeout = self._timeout
if _timeout is not None and not isinstance(_timeout, (int, float)):
_timeout = getattr(_timeout, "connect", None)
return ProxyIMAP4_TLS(
host=self.host,
port=self.port,
ssl_context=self.ssl_context,
proxy=self._proxy,
timeout=_timeout,
)
# Connexion non-SSL à travers le proxy (rare, mais supporté)
# On monkey-patch juste la connexion TCP
raise NotImplementedError(
"Connexion IMAP non-SSL via proxy non implémentée. "
"Utilisez ssl=True (port 993)."
)
def login(self, username: str, password: str):
"""Surcharge login() pour envoyer IMAP ID juste après l'authentification."""
result = super().login(username, password)
send_imap_id(self)
return result
# ──────────────────────────────────────────────────────────────
# Fonctions utilitaires
# ──────────────────────────────────────────────────────────────
def get_imap_server(login: str) -> str:
"""Retourne le serveur IMAP correspondant au domaine du login."""
login_lower = login.lower()
for domain, server in IMAP_SERVER_MAP:
if domain in login_lower:
return server
return "imap.aol.com" # fallback
def extract_body(email_message: Message) -> str:
"""Extrait le corps HTML ou texte d'un email."""
body = ""
for part in email_message.walk():
content_type = part.get_content_type()
try:
if content_type == "text/html":
payload = part.get_payload(decode=True)
if payload:
body += payload.decode("utf-8", errors="ignore")
elif content_type == "text/plain":
payload = part.get_payload()
if payload:
body += str(payload)
except Exception as exc:
logger.warning("Erreur extraction body : %s", exc)
return body
def find_validation_urls(text: str) -> List[str]:
"""Recherche toutes les URLs de validation Hermes dans un texte."""
return re.findall(VALIDATION_URL_REGEX, text)
# ──────────────────────────────────────────────────────────────
# Lecteur principal
# ──────────────────────────────────────────────────────────────
class ProxyMailReader:
"""
Lit les emails d'un compte via IMAPClient en passant par un proxy.
Paramètres
----------
account : MailAccount
Identifiants du compte email.
proxy : ProxyConfig
Configuration du proxy.
timeout : float, optional
Timeout de connexion en secondes (défaut : 30 s).
"""
def __init__(
self,
account: MailAccount,
proxy: ProxyConfig,
timeout: float = 30.0,
):
self.account = account
self.proxy = proxy
self.timeout = timeout
# ── Connexion ────────────────────────────────────────────
def _connect(self) -> ProxyIMAPClient:
imap_server = get_imap_server(self.account.login)
logger.info(
"[%s] Connexion via %s%s:993",
self.account.login, self.proxy, imap_server,
)
client = ProxyIMAPClient(
host=imap_server,
proxy=self.proxy,
use_uid=True,
ssl=True,
timeout=self.timeout,
)
client.login(self.account.login, self.account.password)
logger.info("[%s] Connecté.", self.account.login)
return client
# ── Lecture des dossiers ─────────────────────────────────
def _list_folders(self, client: ProxyIMAPClient) -> List[str]:
return [info[-1] for info in client.list_folders()]
# ── Lecture des messages ─────────────────────────────────
def _read_folder(
self,
client: ProxyIMAPClient,
folder: str,
since: Optional[datetime.datetime] = None,
) -> List[MailResult]:
results: List[MailResult] = []
since = since or datetime.datetime.today()
try:
client.select_folder(folder, readonly=True)
except Exception as exc:
logger.warning("[%s] Impossible d'ouvrir '%s' : %s",
self.account.login, folder, exc)
return results
try:
uids = client.search(["SINCE", since])
except Exception as exc:
logger.warning("[%s] Recherche échouée dans '%s' : %s",
self.account.login, folder, exc)
return results
if not uids:
return results
logger.info("[%s] %d message(s) dans '%s'",
self.account.login, len(uids), folder)
for uid, msg_data in client.fetch(uids, "RFC822").items():
try:
raw = msg_data.get(b"RFC822") or msg_data.get("RFC822")
if raw is None:
continue
em = email.message_from_bytes(raw)
subject = em.get("Subject", "")
from_addr = em.get("From", "")
to_addr = em.get("To", self.account.login)
# Filtrer : on ne garde que les emails de validation Hermes
is_validation = (
VALIDATION_URL_SUBJECT_FR in subject
or VALIDATION_URL_SUBJECT_EN in subject
or "no-reply@hermes.com" in from_addr.lower()
)
if not is_validation:
continue
body = extract_body(em)
urls = find_validation_urls(body)
result = MailResult(
account=self.account.login,
subject=subject,
from_address=from_addr,
to_address=to_addr,
body=body,
validation_urls=urls,
)
results.append(result)
logger.info(
"[%s] Email de validation trouvé (uid=%s) — URLs : %s",
self.account.login, uid, urls or "aucune",
)
except Exception as exc:
logger.warning(
"[%s] Erreur traitement uid=%s : %s",
self.account.login, uid, exc,
)
return results
# ── Point d'entrée public ────────────────────────────────
def read(
self,
since: Optional[datetime.datetime] = None,
skip_folders: Optional[List[str]] = None,
) -> List[MailResult]:
"""
Se connecte au serveur IMAP via le proxy et retourne la liste
des emails de validation trouvés depuis `since` (aujourd'hui par défaut).
Paramètres
----------
since : datetime, optional — date de début de recherche
skip_folders : list[str], optional — dossiers à ignorer
(défaut : ["Sent", "Drafts", "Trash", "Junk", "Spam"])
"""
if skip_folders is None:
skip_folders = ["Sent", "Drafts", "Trash", "Junk", "Spam"]
all_results: List[MailResult] = []
client = self._connect()
try:
folders = self._list_folders(client)
logger.info("[%s] Dossiers : %s", self.account.login, folders)
for folder in folders:
if folder in skip_folders:
logger.debug("[%s] Dossier ignoré : %s",
self.account.login, folder)
continue
all_results.extend(self._read_folder(client, folder, since))
finally:
try:
client.logout()
except Exception:
pass
return all_results
# ──────────────────────────────────────────────────────────────
# Lecture parallèle de plusieurs comptes
# ──────────────────────────────────────────────────────────────
from concurrent.futures import ThreadPoolExecutor, as_completed
def read_multiple_accounts(
accounts: List[MailAccount],
proxy: ProxyConfig,
since: Optional[datetime.datetime] = None,
max_workers: int = 10,
timeout: float = 30.0,
) -> List[MailResult]:
"""
Lit plusieurs comptes email en parallèle via le même proxy.
Retourne la liste consolidée de tous les MailResult trouvés.
"""
all_results: List[MailResult] = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_map = {
executor.submit(
ProxyMailReader(acc, proxy, timeout).read, since
): acc.login
for acc in accounts
}
for future in as_completed(future_map):
login = future_map[future]
try:
results = future.result()
logger.info("[%s] %d email(s) de validation récupéré(s).",
login, len(results))
all_results.extend(results)
except Exception as exc:
logger.error("[%s] Erreur : %s", login, exc)
return all_results
# ──────────────────────────────────────────────────────────────
# Point d'entrée — exemple d'utilisation
# ──────────────────────────────────────────────────────────────
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(message)s",
datefmt="%H:%M:%S",
)
# ── 1. Configurer le proxy ───────────────────────────────
proxy = ProxyConfig(
host=os.environ.get("GMX_PROXY_HOST", ""),
port=int(os.environ.get("GMX_PROXY_PORT", "443")),
proxy_type=os.environ.get("GMX_PROXY_TYPE", "SOCKS5"),
username=os.environ.get("GMX_PROXY_USERNAME"),
password=os.environ.get("GMX_PROXY_PASSWORD"),
)
# ── 2. Définir les comptes à lire ────────────────────────
accounts = [
MailAccount(login="birgitnaya@gmx.net", password="XEeUF3Y1yaO"),
# MailAccount(login="user@gmail.com", password="apppassword"),
# MailAccount(login="user@outlook.com", password="password"),
]
# ── 3. Lancer la lecture ─────────────────────────────────
results = read_multiple_accounts(
accounts=accounts,
proxy=proxy,
since=datetime.datetime.today(),
max_workers=5,
timeout=30.0,
)
# ── 4. Afficher les résultats ────────────────────────────
print(f"\n{'='*60}")
print(f" {len(results)} email(s) de validation trouvé(s)")
print(f"{'='*60}\n")
for r in results:
print(f" Compte : {r.account}")
print(f" De : {r.from_address}")
print(f" Sujet : {r.subject}")
print(f" URLs : {r.validation_urls or 'aucune'}")
print(f" {'-'*56}")
+54 -84
View File
@@ -1,8 +1,22 @@
"""
mail_constants.py
=================
Constantes de domaine email et fabrique d'instances IMAPClient.
Architecture mail/ :
imap_proxy_reader ← bibliothèque de bas niveau (proxy, IMAP ID, extract_body, server map)
mail_constants ← cette couche : FingerprintIMAPClient + create_imap() pour les comptes directs
mail_reader_all_contacts ← logique métier haut niveau (MailReader, find_links_to_validate_…)
"""
import imaplib import imaplib
from imapclient import IMAPClient from imapclient import IMAPClient
from mail.imap_proxy_reader import send_imap_id, get_imap_server
# ── Constantes de domaine (conservées pour la compatibilité des imports externes) ──
# 邮件域名常量
DOMAIN_YAHOO = "yahoo.com" DOMAIN_YAHOO = "yahoo.com"
DOMAIN_SINA = "sina.com" DOMAIN_SINA = "sina.com"
DOMAIN_HOTMAIL = "hotmail.com" DOMAIN_HOTMAIL = "hotmail.com"
@@ -27,8 +41,6 @@ DOMAIN_GAZETA_PL = "gazeta.pl"
DOMAIN_NAVER = "naver.com" DOMAIN_NAVER = "naver.com"
DOMAIN_INBOX_LV = "inbox.lv" DOMAIN_INBOX_LV = "inbox.lv"
DOMAIN_GMX_DE = "gmx.de" DOMAIN_GMX_DE = "gmx.de"
# 垃圾邮件域名
DOMAIN_PISS_MAIL = "pissmail.com" DOMAIN_PISS_MAIL = "pissmail.com"
DOMAIN_INCEL_EMAIL = "incel.email" DOMAIN_INCEL_EMAIL = "incel.email"
DOMAIN_SHITPOSTING_EXPERT = "shitposting.expert" DOMAIN_SHITPOSTING_EXPERT = "shitposting.expert"
@@ -40,93 +52,51 @@ DOMAIN_WEB_DE = "web.de"
DOMAIN_OUTLOOK_COM = "outlook.com" DOMAIN_OUTLOOK_COM = "outlook.com"
DOMAIN_FIREMAIL_DE = "firemail.de" DOMAIN_FIREMAIL_DE = "firemail.de"
# IMAP服务器地址常量 # ── Note : les constantes IMAP_SERVER_* ont été supprimées. ───────────────────
AOL_IMAP_SERVER = "imap.aol.com" # Utiliser imap_proxy_reader.IMAP_SERVER_MAP ou imap_proxy_reader.get_imap_server(login)
IMAP_SERVER_163 = "imap.163.com" # pour obtenir le serveur IMAP correspondant à un domaine.
IMAP_SERVER_SINA = "imap.sina.com"
YAHOO_IMAP_SERVER = "imap.mail.yahoo.com"
HOTMAIL_IMAP_SERVER = "outlook.office365.com"
RAMBLER_IMAP_SERVER = "imap.rambler.ru"
ALICE_IMAP_SERVER = "in.alice.it" # ── IMAPClient avec IMAP ID spoofing ─────────────────────────────────────────
TIME_IT_SERVER = "imap.tim.it"
MARS_DTI_NE_JP_SERVER = "imap.cm.dream.jp" class FingerprintIMAPClient(IMAPClient):
NAVER_SERVER = "imap.naver.com" """
BTVM_NE_JP_SERVER = "imap.btvm.ne.jp" IMAPClient qui envoie automatiquement la commande IMAP ID (RFC 2971)
GMAIL_IMAP_SERVER = "imap.gmail.com" après chaque login(), pour usurper le fingerprint d'un vrai client mail.
ONET_IMAP_SERVER = "imap.poczta.onet.pl" """
GMX_IMAP_SERVER = "imap.gmx.com"
GMX_NET_IMAP_SERVER = "imap.gmx.net" def login(self, username: str, password: str):
GMX_AT_IMAP_SERVER = "imap.gmx.at" result = super().login(username, password)
FIREMAIL_DE_IMAP_SERVER = "imap.firemail.de" send_imap_id(self)
PISS_MAIL_IMAP_SERVER = "mail.pissmail.com" return result
INBOX_LV_IMAP_SERVER = "mail.inbox.lv"
WEB_DE_IMAP_SERVER = "imap.web.de"
GAZETA_PL_IMAP_SERVER = "imap.gazeta.pl" # ── Fabrique d'instances IMAPClient ──────────────────────────────────────────
def show_folders(imap) -> list: def show_folders(imap) -> list:
"""Retourne la liste des dossiers IMAP (compatible IMAPClient et imaplib)."""
folders = [] folders = []
isImapClient = isinstance(imap, IMAPClient) is_imap_client = isinstance(imap, IMAPClient)
if not isImapClient: if not is_imap_client:
for i in imap.list()[1]: for i in imap.list()[1]:
l = i.decode().split(' "/" ') parts = i.decode().split(' "/" ')
if len(l) > 1: if len(parts) > 1:
folders.append(l[1]) folders.append(parts[1])
if len(folders) == 0: if not folders:
folders.append('INBOX') folders.append("INBOX")
return folders
else: else:
list = imap.list_folders() for info in imap.list_folders():
for i in list: folders.append(info[-1])
name = i[-1]
folders.append(name)
return folders return folders
def create_imap(login: str): def create_imap(login: str) -> FingerprintIMAPClient:
# 创建一个IMAP4类实例 """
if DOMAIN_163 in login: Crée et retourne un FingerprintIMAPClient connecté au bon serveur IMAP
imap = IMAPClient(IMAP_SERVER_163, use_uid=True) pour le domaine de l'adresse email fournie.
elif DOMAIN_YAHOO in login:
imap = IMAPClient(YAHOO_IMAP_SERVER, use_uid=True) La résolution domaine → serveur est déléguée à get_imap_server()
elif DOMAIN_FIREMAIL_DE in login: (défini dans imap_proxy_reader, source de vérité unique).
imap = IMAPClient(FIREMAIL_DE_IMAP_SERVER, use_uid=True) """
elif DOMAIN_GMX in login or DOMAIN_GMX_FR in login or DOMAIN_GMX_US in login or DOMAIN_GMX_CH in login or DOMAIN_GMX_PT in login or DOMAIN_GMX_SG in login: server = get_imap_server(login)
imap = IMAPClient(GMX_IMAP_SERVER, use_uid=True) return FingerprintIMAPClient(server, use_uid=True)
elif DOMAIN_SINA in login:
imap = IMAPClient(IMAP_SERVER_SINA, use_uid=True)
elif DOMAIN_HOTMAIL in login or DOMAIN_OUTLOOK_COM in login:
imap = IMAPClient(HOTMAIL_IMAP_SERVER, use_uid=True)
elif DOMAIN_RAMBLER_RU in login:
imap = IMAPClient(RAMBLER_IMAP_SERVER, use_uid=True)
elif DOMAIN_BTVM_NE_JP in login:
imap = IMAPClient(BTVM_NE_JP_SERVER, use_uid=True)
elif DOMAIN_GMAIL in login:
imap = IMAPClient(GMAIL_IMAP_SERVER, use_uid=True)
elif DOMAIN_ONET in login:
imap = IMAPClient(ONET_IMAP_SERVER, use_uid=True)
elif DOMAIN_TIM_IT in login:
imap = IMAPClient(TIME_IT_SERVER, use_uid=True)
elif DOMAIN_ALICE_IT in login:
imap = IMAPClient(ALICE_IMAP_SERVER, use_uid=True)
elif DOMAIN_MARS_DTI_NE_JP in login:
imap = IMAPClient(MARS_DTI_NE_JP_SERVER, use_uid=True)
elif DOMAIN_AURORA_DTI_NE_JP in login:
imap = IMAPClient(MARS_DTI_NE_JP_SERVER, use_uid=True)
elif DOMAIN_NAVER in login:
imap = IMAPClient(NAVER_SERVER, use_uid=True)
elif DOMAIN_GMX_DE in login or DOMAIN_GMX_NET in login:
imap = IMAPClient(GMX_NET_IMAP_SERVER, use_uid=True)
elif DOMAIN_GMX_AT in login:
imap = IMAPClient(GMX_AT_IMAP_SERVER, use_uid=True)
elif DOMAIN_GAZETA_PL in login:
imap = IMAPClient(GAZETA_PL_IMAP_SERVER, use_uid=True)
elif DOMAIN_INBOX_LV in login:
imap = IMAPClient(INBOX_LV_IMAP_SERVER, use_uid=True)
elif DOMAIN_WEB_DE in login:
imap = IMAPClient(WEB_DE_IMAP_SERVER, use_uid=True)
elif DOMAIN_PISS_MAIL in login or DOMAIN_CHILD_PIZZA in login or DOMAIN_DMC_CHAT in login or DOMAIN_GENOCIDE_FUN in login or DOMAIN_HATESJE_WS in login or DOMAIN_INCEL_EMAIL in login or DOMAIN_SHITPOSTING_EXPERT in login:
imap = IMAPClient(PISS_MAIL_IMAP_SERVER, use_uid=True)
else:
imap = IMAPClient(AOL_IMAP_SERVER, use_uid=True)
return imap
+326 -143
View File
@@ -1,29 +1,92 @@
import datetime import datetime
import email import email
import logging import logging
import os
import random
import re import re
from concurrent.futures import ThreadPoolExecutor import time
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from email.header import decode_header from email.header import decode_header
from email.message import Message from typing import Union, List, Optional, Dict
from typing import Union, List
from imapclient import IMAPClient from dotenv import load_dotenv
from db.mongo_manager import MONGO_STORE_MANAGER from db.mongo_manager import MONGO_STORE_MANAGER
from excel_reader import read_contacts from excel_reader import read_contacts
from mail.mail_constants import DOMAIN_HOTMAIL, create_imap from mail.mail_constants import DOMAIN_HOTMAIL, create_imap, show_folders
from mail.imap_proxy_reader import (
ProxyIMAPClient, ProxyConfig, get_imap_server,
extract_body,
VALIDATION_URL_SUBJECT_FR, VALIDATION_URL_SUBJECT_EN,
VALIDATION_URL_REGEX, DATE_FORMAT,
)
from imapclient import IMAPClient
from models.ReserveResultPojo import ReserveResultPojo from models.ReserveResultPojo import ReserveResultPojo
from models.mail_pojo import MailPojo, MailAddress from models.mail_pojo import MailPojo, MailAddress
# 定义常量 # Charger les variables d'environnement depuis .env
VALIDATION_URL_SUBJECT_FR = 'Validation de votre demande de rendez-vous' load_dotenv()
VALIDATION_URL_SUBJECT_EN = 'Please confirm your appointment request'
VALIDATION_URL_REGEX = r"https:\/\/rendezvousparis.hermes.com\/client\/register\/[A-Z0-9]+\/validate.code=[A-Z0-9]+" # ── Constantes locales ────────────────────────────────────────────────────────
# VALIDATION_URL_SUBJECT_FR, VALIDATION_URL_SUBJECT_EN, VALIDATION_URL_REGEX,
# DATE_FORMAT sont importés depuis imap_proxy_reader (source de vérité unique).
PART_VALIDATION_URL_REGEX = r"client\/register\/[A-Z0-9]+\/validate.code=[A-Z0-9]+" PART_VALIDATION_URL_REGEX = r"client\/register\/[A-Z0-9]+\/validate.code=[A-Z0-9]+"
HERMES_EMAIL = "no-reply@hermes.com" HERMES_EMAIL = "no-reply@hermes.com"
EMAIL_ADDRESS_REGEX = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b' EMAIL_ADDRESS_REGEX = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b'
# 日期格式 # Timeouts GMX (en secondes)
DATE_FORMAT = "%d-%b-%Y" IMAP_SOCKET_TIMEOUT = 300 # timeout socket pour chaque opération IMAP
FUTURE_TIMEOUT = 600 # durée max allouée à la lecture d'une boîte mail
# 按域名限制的最大并发线程数(防止单服务商触发风控)
MAX_WORKERS_PER_DOMAIN: Dict[str, int] = {
"gmx": 80,
"aol": 5,
"gmail": 3,
"yahoo": 5,
"outlook": 5,
"hotmail": 5,
"firemail": 5,
"inbox.lv": 5,
"default": 5,
}
# 两次读取同一邮箱的最短间隔(分钟),避免频繁重复登录
MAIL_READ_MIN_INTERVAL_MINUTES = 15
# GMX域名列表(用于判断是否需要使用代理)
GMX_DOMAINS = (
"gmx.com", "gmx.net", "gmx.de", "gmx.at",
"gmx.fr", "gmx.us", "gmx.sg", "gmx.ch", "gmx.pt",
)
# 需要通过代理读取的域名列表
PROXY_DOMAINS = GMX_DOMAINS + ("yahoo.com",)
# PROXY_DOMAINS = GMX_DOMAINS + ("yahoo.com",)
# PROXY_DOMAINS = GMX_DOMAINS
def is_gmx_account(login: str) -> bool:
"""判断邮箱是否属于GMX域名"""
return any(d in login.lower() for d in GMX_DOMAINS)
def is_proxy_account(login: str) -> bool:
"""判断邮箱是否需要通过代理读取(GMX 或 inbox.lv"""
return any(d in login.lower() for d in PROXY_DOMAINS)
def get_domain_group(login: str) -> str:
"""
将邮箱地址映射到域名分组键,用于限流。
例如: "user@gmx.net""gmx", "user@aol.com""aol"
"""
login_lower = login.lower()
for key in MAX_WORKERS_PER_DOMAIN:
if key != "default" and key in login_lower:
return key
return "default"
# 邮箱列表(简化为常量) # 邮箱列表(简化为常量)
REDIRECTION_MAILS = "appointment2022@aol.com, chenpeijun@aol.com,hongjiang176@aol.com,ciyuexie@aol.com,rutger.62@aol.com,ciccidaniel@aol.com,armasgoodman@aol.com,wknd.gemerine@aol.com,rafmail1981@aol.com,tonovichivanenaki@aol.com,hetland.ari@aol.com,mateusiversen@aol.com,lacerdaraffaello@aol.com,anasida76@aol.com,liamolinari@aol.com,sen70zib@aol.com,mezeiderrick@aol.com,stanisl49avchic@aol.com,damcvrobaneuron@aol.com,suyzanna_fleona@aol.com,dxealing.dissa@aol.com,hogg.karen@aol.com,obocharovamarina@aol.com,buchholzjohann@aol.com,orn.cecchini@aol.com,percivaltorgersen@aol.com,candalgudrun@aol.com,filimonis.76@aol.com,bengann_100@aol.com,axelhanne@aol.com,tiffanylarochelle@aol.com,nicoleta.r@aol.com,eichenbaum.1963@aol.com,kotensasharev@aol.com,samognat32@aol.com,edem_headshot@aol.com,kozmakuzmich1960@aol.com,damonsvensson@aol.com,anders.riva@aol.com,caiminwei123@gmail.com,yulingguo086@gmail.com,yingxiaolu086@gmail.com,lijiazhen0035@gmail.com,fangp370@gmail.com,huangyayu10086@gmail.com,fuziyuan110@gmail.com,xinyingdu886@gmail.com,yasiaforever.1971@aol.com,lukaszfidalgo@aol.com,zaichi29@aol.com,prostotakitak.1974@aol.com,mo90nroe@aol.com,blonde.87@aol.com,dimidrol.1969@aol.com" REDIRECTION_MAILS = "appointment2022@aol.com, chenpeijun@aol.com,hongjiang176@aol.com,ciyuexie@aol.com,rutger.62@aol.com,ciccidaniel@aol.com,armasgoodman@aol.com,wknd.gemerine@aol.com,rafmail1981@aol.com,tonovichivanenaki@aol.com,hetland.ari@aol.com,mateusiversen@aol.com,lacerdaraffaello@aol.com,anasida76@aol.com,liamolinari@aol.com,sen70zib@aol.com,mezeiderrick@aol.com,stanisl49avchic@aol.com,damcvrobaneuron@aol.com,suyzanna_fleona@aol.com,dxealing.dissa@aol.com,hogg.karen@aol.com,obocharovamarina@aol.com,buchholzjohann@aol.com,orn.cecchini@aol.com,percivaltorgersen@aol.com,candalgudrun@aol.com,filimonis.76@aol.com,bengann_100@aol.com,axelhanne@aol.com,tiffanylarochelle@aol.com,nicoleta.r@aol.com,eichenbaum.1963@aol.com,kotensasharev@aol.com,samognat32@aol.com,edem_headshot@aol.com,kozmakuzmich1960@aol.com,damonsvensson@aol.com,anders.riva@aol.com,caiminwei123@gmail.com,yulingguo086@gmail.com,yingxiaolu086@gmail.com,lijiazhen0035@gmail.com,fangp370@gmail.com,huangyayu10086@gmail.com,fuziyuan110@gmail.com,xinyingdu886@gmail.com,yasiaforever.1971@aol.com,lukaszfidalgo@aol.com,zaichi29@aol.com,prostotakitak.1974@aol.com,mo90nroe@aol.com,blonde.87@aol.com,dimidrol.1969@aol.com"
@@ -64,33 +127,64 @@ def find_from_mail(param) -> str:
class MailReader: class MailReader:
"""邮件读取器类""" """邮件读取器类"""
def __init__(self, login: str, password: str): def __init__(self, login: str, password: str, proxy: Optional[ProxyConfig] = None,
failed_gmx_list: Optional[List[str]] = None,
delay_range: tuple = (1.0, 5.0)):
self.login = login self.login = login
self.password = password self.password = password
self.proxy = proxy
@staticmethod self.failed_gmx_list = failed_gmx_list if failed_gmx_list is not None else []
def show_folders(imap) -> List[str]: self.delay_range = delay_range # (min_seconds, max_seconds) 随机延迟范围
"""获取邮箱文件夹列表"""
folders = []
is_imap_client = isinstance(imap, IMAPClient)
if not is_imap_client:
# 处理非IMAPClient对象
for i in imap.list()[1]:
l = i.decode().split(' "/" ')
folders.append(l[1])
else:
# 处理IMAPClient对象
folder_list = imap.list_folders()
for i in folder_list:
name = i[-1]
folders.append(name)
return folders
def read_emails(self, mails_messages: List[MailPojo]) -> List[MailPojo]: def read_emails(self, mails_messages: List[MailPojo]) -> List[MailPojo]:
"""读取邮件""" """读取邮件(含随机延迟和读取时间记录)"""
imap = create_imap(self.login) # 随机延迟,模拟人工节奏,降低被识别为机器人的概率
_delay = random.uniform(*self.delay_range)
time.sleep(_delay)
# ── GMX / inbox.lv 账户 → 使用代理连接(失败自动重试最多3次)──
if is_proxy_account(self.login) and self.proxy is not None:
result = self._read_emails_with_proxy_retry(mails_messages)
else:
result = self._read_emails_internal(create_imap(self.login), mails_messages)
# 记录本次读取时间,供下次调用的 need_to_check_email 判断间隔
MONGO_STORE_MANAGER.update_mail_read_time(self.login)
return result
def _read_emails_with_proxy_retry(
self,
mails_messages: List[MailPojo],
max_retries: int = 8,
) -> List[MailPojo]:
"""通过 ProxyIMAPClient 读取邮件(GMX / inbox.lv),失败时最多重试 max_retries 次。"""
imap_server = get_imap_server(self.login)
last_error: Optional[Exception] = None
for attempt in range(1, max_retries + 1):
try:
print("[Proxy] {}{} via {} (tentative {}/{})".format(
self.login, imap_server, self.proxy, attempt, max_retries))
imap = ProxyIMAPClient(
host=imap_server,
proxy=self.proxy,
use_uid=True,
ssl=True,
timeout=IMAP_SOCKET_TIMEOUT,
)
return self._read_emails_internal(imap, mails_messages)
except Exception as exc:
last_error = exc
print("[Proxy] Échec tentative {}/{} pour {} : {}".format(
attempt, max_retries, self.login, exc))
print("[Proxy] Toutes les tentatives ont échoué pour {} : {}".format(
self.login, last_error))
self.failed_gmx_list.append(self.login)
return []
def _read_emails_internal(self, imap, mails_messages: List[MailPojo]) -> List[MailPojo]:
"""Logique commune de lecture des emails (IMAPClient ou imaplib)."""
is_imap_client = isinstance(imap, IMAPClient) is_imap_client = isinstance(imap, IMAPClient)
# 登录邮箱 # 登录邮箱
@@ -104,8 +198,8 @@ class MailReader:
mail_list = [] mail_list = []
print("read mails from {}".format(self.login)) print("read mails from {}".format(self.login))
# 获取文件夹列表 # 获取文件夹列表(委托给 mail_constants.show_folders
folder_list = self.show_folders(imap) folder_list = show_folders(imap)
# 处理每个文件夹 # 处理每个文件夹
for folder in folder_list: for folder in folder_list:
@@ -133,28 +227,37 @@ class MailReader:
return mail_list return mail_list
def _get_messages_from_folder(self, imap, subject: str, folder: str = "INBOX") -> List[MailPojo]: def _get_messages_from_folder(self, imap, subject: str, folder: str = "INBOX") -> List[MailPojo]:
"""从指定文件夹获取邮件(传统IMAP方式)""" """从指定文件夹获取邮件(传统IMAP方式,批量fetch减少往返次数"""
imap.select(folder) imap.select(folder)
mail_messages = [] mail_messages = []
# 搜索邮件 # 搜索符合条件的所有邮件ID(服务器端同时过滤发件人,减少无关邮件下载量)
search_query = '(SUBJECT "{}" SINCE "{}")'.format(subject, datetime.datetime.today().strftime(DATE_FORMAT)) search_query = '(FROM "{}" SUBJECT "{}" SINCE "{}")'.format(
HERMES_EMAIL, subject, datetime.datetime.today().strftime(DATE_FORMAT))
typ, data = imap.search(None, search_query) typ, data = imap.search(None, search_query)
for i in data[0].split(): ids = data[0].split()
try: if not ids:
# 获取邮件内容 return mail_messages
res, msg = imap.fetch(i.decode("utf-8"), "(RFC822)")
# 解析邮件 # 批量fetch:一次请求取回所有匹配邮件,减少 N 次往返为 1 次
for response in msg: id_list = b",".join(ids)
if isinstance(response, tuple): try:
res, msg_list = imap.fetch(id_list, "(RFC822)")
except Exception as error:
print("Batch fetch error in folder {}: {}".format(folder, error))
return mail_messages
for response in msg_list:
if not isinstance(response, tuple):
continue
try:
email_message = email.message_from_bytes(response[1]) email_message = email.message_from_bytes(response[1])
# 解码主题 # 解码主题
subject, subject_encoded = decode_header(email_message["Subject"])[0] subject_decoded, subject_encoded = decode_header(email_message["Subject"])[0]
if isinstance(subject, bytes): if isinstance(subject_decoded, bytes):
subject = subject.decode(subject_encoded) subject_decoded = subject_decoded.decode(subject_encoded)
# 解码发件人地址 # 解码发件人地址
from_address = find_from_mail(decode_header(email_message.get("From"))) from_address = find_from_mail(decode_header(email_message.get("From")))
@@ -165,15 +268,15 @@ class MailReader:
print("Email:", self.login) print("Email:", self.login)
print("From:", from_address) print("From:", from_address)
print("To:", to_email) print("To:", to_email)
print("Subject:", subject) print("Subject:", subject_decoded)
# 获取邮件正文 # 获取邮件正文(委托给 imap_proxy_reader.extract_body
body = self._extract_body(email_message) body = extract_body(email_message)
# 检查是否是预约验证邮件 # 检查是否是预约验证邮件
if VALIDATION_URL_SUBJECT_FR in subject or VALIDATION_URL_SUBJECT_EN in subject: if VALIDATION_URL_SUBJECT_FR in subject_decoded or VALIDATION_URL_SUBJECT_EN in subject_decoded:
mail = MailPojo( mail = MailPojo(
subject=subject, subject=subject_decoded,
body=body, body=body,
from_address=from_address from_address=from_address
) )
@@ -191,30 +294,6 @@ class MailReader:
return mail_messages return mail_messages
def _extract_body(self, email_message: Message) -> str:
"""提取邮件正文"""
body = ""
# 遍历邮件部分
for part in email_message.walk():
try:
content_type = part.get_content_type()
if content_type == "text/html":
# 处理HTML内容
payload = part.get_payload(decode=True)
if payload:
body += payload.decode("utf-8", errors="ignore")
elif content_type == "text/plain":
# 处理纯文本内容
payload = part.get_payload()
if payload:
body += payload
except Exception as error:
print("Error extracting body part: {}".format(error))
return body
def _get_messages_from_folder_for_imapclient(self, imap, folder: str = "INBOX") -> List[MailPojo]: def _get_messages_from_folder_for_imapclient(self, imap, folder: str = "INBOX") -> List[MailPojo]:
"""从指定文件夹获取邮件(IMAPClient方式)""" """从指定文件夹获取邮件(IMAPClient方式)"""
mail_messages = [] mail_messages = []
@@ -225,7 +304,7 @@ class MailReader:
print("{}: search terms is {}".format(self.login, search_terms)) print("{}: search terms is {}".format(self.login, search_terms))
imap.select_folder(folder) imap.select_folder(folder)
messages = imap.search(['SINCE', datetime.datetime.today()]) messages = imap.search(['SINCE', datetime.datetime.today(), 'FROM', HERMES_EMAIL])
print("{}: {} messages from our best friend".format(self.login, len(messages))) print("{}: {} messages from our best friend".format(self.login, len(messages)))
if len(messages) == 0: if len(messages) == 0:
@@ -236,18 +315,11 @@ class MailReader:
try: try:
email_message = email.message_from_bytes(message_data[b'RFC822']) email_message = email.message_from_bytes(message_data[b'RFC822'])
# 获取发件人和主题 from_address = email_message.get('FROM') or ""
from_address = email_message.get('FROM') subject = email_message.get('subject') or ""
subject = email_message.get('subject')
# 检查是否是Hermes邮件 # 提取邮件正文(委托给 imap_proxy_reader.extract_body
hermes_mail_address = "no-reply@hermes.com" body = extract_body(email_message)
if (hermes_mail_address in from_address or
"outlook.com" in from_address or
"hotmail" in from_address):
# 提取邮件正文
body = self._extract_body_for_imapclient(email_message)
# 检查是否是预约验证邮件 # 检查是否是预约验证邮件
if (VALIDATION_URL_SUBJECT_FR in subject or if (VALIDATION_URL_SUBJECT_FR in subject or
@@ -263,7 +335,6 @@ class MailReader:
mail.isImapClient = True mail.isImapClient = True
print("email is {}".format(self.login)) print("email is {}".format(self.login))
print("body is {}".format(body))
print("subject is {}".format(subject)) print("subject is {}".format(subject))
# 设置收件人地址 # 设置收件人地址
@@ -280,24 +351,6 @@ class MailReader:
return mail_messages return mail_messages
def _extract_body_for_imapclient(self, email_message: Message) -> str:
"""提取IMAPClient邮件正文"""
body = ""
for part in email_message.walk():
content_type = part.get_content_type()
if content_type == "text/html":
payload = part.get_payload(decode=True)
if payload:
body += payload.decode("utf-8", errors="ignore")
elif content_type == "text/plain":
payload = part.get_payload()
if payload:
body += payload
return body
# 邮件处理相关函数 # 邮件处理相关函数
def find_item_by_url(url: str, successful_items) -> Union[None, ReserveResultPojo]: def find_item_by_url(url: str, successful_items) -> Union[None, ReserveResultPojo]:
@@ -334,56 +387,130 @@ def need_to_valid_url(url: str, item: Union[ReserveResultPojo, None]) -> bool:
def need_to_check_email(mail: str, successful_items) -> bool: def need_to_check_email(mail: str, successful_items) -> bool:
"""判断是否需要检查邮件""" """
判断是否需要检查邮件。
两种情况跳过:
1. 该邮箱已有成功验证记录(原逻辑)
2. 距上次读取不足 MAIL_READ_MIN_INTERVAL_MINUTES 分钟(防频繁重复登录)
"""
print("successful_items size is " + str(len(successful_items))) print("successful_items size is " + str(len(successful_items)))
# 特殊处理 # 原逻辑:已有成功验证则跳过
if mail == "saigecong1990@pissmail.com":
return True
# 过滤已验证的项目
filtered_items = [item for item in successful_items if item.email == mail] filtered_items = [item for item in successful_items if item.email == mail]
# 检查是否有已验证的项目
validated_items = [item for item in filtered_items validated_items = [item for item in filtered_items
if item.url_validated is not None and item.url_validated is True] if item.url_validated is not None and item.url_validated is True]
if len(validated_items) > 0:
return False
return len(validated_items) == 0 # 新逻辑:距上次读取时间太短则跳过
last_read = MONGO_STORE_MANAGER.get_last_mail_read_time(mail)
if last_read is not None:
elapsed_minutes = (datetime.datetime.utcnow() - last_read).total_seconds() / 60
if elapsed_minutes < MAIL_READ_MIN_INTERVAL_MINUTES:
print("[跳过] {} 距上次读取仅 {:.1f} 分钟,未达到最小间隔 {} 分钟".format(
mail, elapsed_minutes, MAIL_READ_MIN_INTERVAL_MINUTES))
return False
return True
def find_links_to_validate_from_mail_list(mail_list: List[MailAddress], logger) -> None: def find_links_to_validate_from_mail_list(
"""从邮件列表中查找需要验证的链接""" mail_list: List[MailAddress],
logger,
proxy: Optional[ProxyConfig] = None,
proxy_pool: Optional[List[ProxyConfig]] = None,
) -> List[str]:
"""
从邮件列表中查找需要验证的链接,返回读取失败的GMX账户列表。
参数
----
proxy : 单一代理(GMX专用,兼容旧调用方式)
proxy_pool : 代理列表(非GMX账号也会轮换使用;若为空则非GMX走直连)
"""
if not mail_list: if not mail_list:
return return []
# 检查时间前开始检查邮件
contact_to_book_list = MONGO_STORE_MANAGER.get_all_contact_to_book_list() contact_to_book_list = MONGO_STORE_MANAGER.get_all_contact_to_book_list()
successful_items = MONGO_STORE_MANAGER.get_all_successful_items_for_day() successful_items = MONGO_STORE_MANAGER.get_all_successful_items_for_day()
mails_messages = [] mails_messages = []
failed_gmx: List[str] = []
# 使用线程池处理邮件 # ── 按域名分组,每组使用独立线程池限流 ────────────────────────────
with ThreadPoolExecutor(max_workers=100) as executor: # domain_group → [(MailAddress, ProxyConfig|None), ...]
futures = [] grouped: Dict[str, List[tuple]] = defaultdict(list)
for mail in mail_list: for idx, mail in enumerate(mail_list):
# 检查是否需要读取邮件 if not need_to_check_email(mail.mail, successful_items):
if need_to_check_email(mail.mail, successful_items): continue
mail_reader = MailReader(mail.mail, mail.password)
# 为账号分配代理
if is_proxy_account(mail.mail):
# GMX / inbox.lv → 使用专用 GMX 代理
assigned_proxy = proxy
elif proxy_pool:
# 非GMX + 有代理池 → 按索引轮换分配
assigned_proxy = proxy_pool[idx % len(proxy_pool)]
else:
# 无代理池 → 直连
assigned_proxy = None
group_key = get_domain_group(mail.mail)
grouped[group_key].append((mail, assigned_proxy))
# ── 每个域名分组启动独立线程池 ────────────────────────────────────
# future → mail address,用于进度显示
future_to_mail: Dict[object, str] = {}
executors = []
for group_key, items in grouped.items():
max_w = MAX_WORKERS_PER_DOMAIN.get(group_key, MAX_WORKERS_PER_DOMAIN["default"])
executor = ThreadPoolExecutor(max_workers=max_w)
executors.append(executor)
print("[限流] 域名组 '{}': {} 账号,max_workers={}".format(
group_key, len(items), max_w))
for mail, assigned_proxy in items:
mail_reader = MailReader(
mail.mail,
mail.password,
proxy=assigned_proxy,
failed_gmx_list=failed_gmx,
)
future = executor.submit(mail_reader.read_emails, mails_messages) future = executor.submit(mail_reader.read_emails, mails_messages)
futures.append(future) future_to_mail[future] = mail.mail
# 等待所有任务完成 # ── 等待所有任务完成,然后关闭线程池 ─────────────────────────────
for future in futures: total = len(future_to_mail)
completed = 0
for future in as_completed(future_to_mail):
mail_addr = future_to_mail[future]
completed += 1
try: try:
future.result() future.result(timeout=FUTURE_TIMEOUT)
print("[进度] {}/{} {}".format(completed, total, mail_addr))
except TimeoutError:
print("[进度] {}/{} {} — Timeout ({} s), lecture ignorée.".format(
completed, total, mail_addr, FUTURE_TIMEOUT))
except Exception as e: except Exception as e:
print("Error processing mail: {}".format(e)) print("[进度] {}/{} {} — Erreur: {}".format(completed, total, mail_addr, e))
# 刷新成功的项目 for executor in executors:
executor.shutdown(wait=False)
# ── 输出代理账号读取摘要 ──────────────────────────────────────────
if failed_gmx:
print("\n[Proxy] ⚠️ {} compte(s) non lus (GMX / inbox.lv) :".format(len(failed_gmx)))
for addr in failed_gmx:
print("{}".format(addr))
else:
print("\n[Proxy] ✅ Tous les comptes GMX / inbox.lv ont été lus avec succès.")
# ── 处理邮件中的验证链接 ──────────────────────────────────────────
_refreshed_successful_items = MONGO_STORE_MANAGER.get_all_successful_items_for_day() _refreshed_successful_items = MONGO_STORE_MANAGER.get_all_successful_items_for_day()
# 处理邮件中的链接
for mail in mails_messages: for mail in mails_messages:
match = re.search(VALIDATION_URL_REGEX, mail.body) match = re.search(VALIDATION_URL_REGEX, mail.body)
if match: if match:
@@ -402,18 +529,29 @@ def find_links_to_validate_from_mail_list(mail_list: List[MailAddress], logger)
url, url,
mail.to_address, mail.to_address,
model=_model, model=_model,
_all_contact_list=contact_to_book_list, _used_ip= _used_ip) _all_contact_list=contact_to_book_list, _used_ip=_used_ip)
else: else:
logger.info("do not need to click url --> {}".format(mail.mail_address)) logger.info("do not need to click url --> {}".format(mail.mail_address))
return failed_gmx
# 主函数 # 主函数
if __name__ == '__main__': if __name__ == '__main__':
# 读取联系人列表 # 读取联系人列表
contact_to_book_list = read_contacts( contact_to_book_list = read_contacts(
file_name="~/Desktop/contact_list_2025-11-28.xlsx") # file_name="~/Desktop/contact_list_inbox_lv_100.xlsx")
# file_name="~/Desktop/contact_list_all.xlsx") # file_name="~/Desktop/contact_list_2026-04-21_200_yahoo.xlsx")
# file_name="~/Desktop/contact_list_2025-11-06.xlsx") # file_name="~/Desktop/contact_list_yahoo_100_20_04.xlsx")
# file_name="~/Desktop/contact_yahoo_5.xlsx")
# file_name="~/Desktop/contact_list_2026-04-24_yahoo_50.xlsx")
file_name="~/Desktop/contact_list_2026-04-23.xlsx")
# file_name="~/Desktop/contact_list_2026-04-11.xlsx")
# file_name="~/Desktop/contact_list_2026-04-17.xlsx")
# file_name="~/Desktop/contact_list_inbox_100_14_04.xlsx")
# file_name="~/Desktop/contact_list_2024-09-02_firemail_de_100.xlsx")
# file_name="~/Desktop/reste_inbox_lv.xlsx")
# file_name="~/Desktop/yahooo_list.xlsx")
# 获取目标邮箱列表 # 获取目标邮箱列表
all_mail_list = MONGO_STORE_MANAGER.get_destination_emails() all_mail_list = MONGO_STORE_MANAGER.get_destination_emails()
@@ -427,7 +565,6 @@ if __name__ == '__main__':
# 设置日志记录器 # 设置日志记录器
logger = logging.getLogger() logger = logging.getLogger()
# 获取已验证的链接列表 # 获取已验证的链接列表
_all_links = MONGO_STORE_MANAGER.get_links_to_validate() _all_links = MONGO_STORE_MANAGER.get_links_to_validate()
@@ -440,6 +577,52 @@ if __name__ == '__main__':
_to_add = False _to_add = False
if _to_add: if _to_add:
filter_mail.append(mail_pojo) filter_mail.append(mail_pojo)
# filter_mail = [MailAddress("utatapi@gmx.net", "RSAzHAFek8s")] # filter_mail = [MailAddress("pishikmamn@gmx.de", "53OBns2jAXE")]
# ── Mode de lecture : GMX_ONLY=true → uniquement les comptes GMX ──
gmx_only = os.environ.get("GMX_ONLY", "false").strip().lower() == "true"
if gmx_only:
filter_mail = [m for m in filter_mail if is_gmx_account(m.mail)]
print("[Mode] Lecture GMX uniquement ({} comptes)".format(len(filter_mail)))
else:
print("[Mode] Lecture de tous les comptes ({} comptes)".format(len(filter_mail)))
# 配置代理(GMX账号必须通过代理读取)
gmx_proxy = ProxyConfig(
host=os.environ.get("GMX_PROXY_HOST", ""),
port=int(os.environ.get("GMX_PROXY_PORT", "443")),
proxy_type=os.environ.get("GMX_PROXY_TYPE", "SOCKS5"),
username=os.environ.get("GMX_PROXY_USERNAME"),
password=os.environ.get("GMX_PROXY_PASSWORD"),
)
# 非GMX账号代理池(可配置多个,轮换使用;留空则直连)
# 格式:PROXY_POOL_HOSTS="host1:port1,host2:port2",与 GMX_PROXY 同类型
_proxy_pool_raw = os.environ.get("PROXY_POOL_HOSTS", "").strip()
non_gmx_proxy_pool: Optional[List[ProxyConfig]] = None
if _proxy_pool_raw:
non_gmx_proxy_pool = []
for entry in _proxy_pool_raw.split(","):
entry = entry.strip()
if ":" in entry:
_h, _p = entry.rsplit(":", 1)
non_gmx_proxy_pool.append(ProxyConfig(
host=_h,
port=int(_p),
proxy_type=os.environ.get("GMX_PROXY_TYPE", "SOCKS5"),
username=os.environ.get("GMX_PROXY_USERNAME"),
password=os.environ.get("GMX_PROXY_PASSWORD"),
))
# 处理邮件 # 处理邮件
find_links_to_validate_from_mail_list(filter_mail, logger) failed = find_links_to_validate_from_mail_list(
filter_mail, logger, proxy=gmx_proxy, proxy_pool=non_gmx_proxy_pool
)
# ── Afficher les comptes GMX non lus ─────────────────────
if failed:
print("\n===== Comptes GMX non lus ({}) =====".format(len(failed)))
for addr in failed:
print("{}".format(addr))
else:
print("\n===== Tous les comptes GMX ont été lus avec succès =====")
+4 -3
View File
@@ -1,12 +1,13 @@
from multiprocessing import Process from multiprocessing import Process
from queue_message.CookiesPublisher import MORNING_DATA_CACHE_2, MORNING_DATA_CACHE, MORNING_DATA_CACHE_BAK from queue_message.CookiesPublisher import MORNING_DATA_CACHE_2, MORNING_DATA_CACHE, MORNING_DATA_CACHE_BAK, \
REGISTER_QUEUE
from workers.MessagerTransporter import migrate_message_to_queue from workers.MessagerTransporter import migrate_message_to_queue
if __name__ == '__main__': if __name__ == '__main__':
p1 = Process(target=migrate_message_to_queue, args=(MORNING_DATA_CACHE_2, MORNING_DATA_CACHE_BAK)) p1 = Process(target=migrate_message_to_queue, args=(MORNING_DATA_CACHE, MORNING_DATA_CACHE_BAK))
p1.start() p1.start()
p2 = Process(target=migrate_message_to_queue, args=(MORNING_DATA_CACHE, MORNING_DATA_CACHE_BAK)) p2 = Process(target=migrate_message_to_queue, args=(MORNING_DATA_CACHE_2, MORNING_DATA_CACHE_BAK))
p2.start() p2.start()
p2.join() p2.join()
# migrate_message_to_queue(from_queue=MORNING_DATA_CACHE_2) # migrate_message_to_queue(from_queue=MORNING_DATA_CACHE_2)
+1 -1
View File
@@ -93,7 +93,7 @@ class ProxyManager:
return [FR_PROXY_RES_OXY, FR_PROXY_ASOCK_RES_2, FR_DATA_IMPULSE_RES, FR_ASOCKS_MOBILE_PROXY] return [FR_PROXY_RES_OXY, FR_PROXY_ASOCK_RES_2, FR_DATA_IMPULSE_RES, FR_ASOCKS_MOBILE_PROXY]
def get_proxy_for_appointment_request(self) -> dict: def get_proxy_for_appointment_request(self) -> dict:
# return self.get_random_sticky_iproyal_proxy() return self.get_random_sticky_iproyal_proxy()
_chosen_proxy = random.choice(MOBILE_PROXY_LIST) _chosen_proxy = random.choice(MOBILE_PROXY_LIST)
if "oxylabs" in _chosen_proxy["http"]: if "oxylabs" in _chosen_proxy["http"]:
self.logger.info("use oxylabs proxy") self.logger.info("use oxylabs proxy")
+14 -6
View File
@@ -99,9 +99,17 @@ def send_request_for_file_list(file_list: list, thread_number: int = 20, data_qu
if __name__ == '__main__': if __name__ == '__main__':
# file_list = ['~/Desktop/contact_list_2024-05-23.xlsx', # file_list = ['~/Desktop/contact_list_2024-05-23.xlsx',
# '~/Desktop/contact_list_2024-05-21.xlsx', # '~/Desktop/contact_list_2024-05-21.xlsx',
# file_list = ['~/Desktop/contact_list_2025-10-30.xlsx'] # file_list = ['~/Desktop/contact_list_2026-04-15.xlsx']
file_list = ['~/Desktop/contact_list_2025-11-28.xlsx'] # file_list = ['~/Desktop/contact_yahoo_5.xlsx']
# file_list = ['~/Desktop/contact_list_2025-11-06.xlsx'] # file_list = ['~/Desktop/contact_list_inbox_lv_100.xlsx']
# file_list = ['~/Desktop/contact_list_all.xlsx'] # file_list = ['~/Desktop/contact_list_yahoo_100_20_04.xlsx']
send_request_for_file_list(file_list=file_list, thread_number=20, # file_list = ['~/Desktop/contact_list_2026-04-21_200_yahoo.xlsx']
data_queue_name=MORNING_DATA_CACHE_2, stop_at_hour=19, stop_at_mins=50) file_list = ['~/Desktop/contact_list_2026-04-23.xlsx']
# file_list = ['~/Desktop/contact_list_2026-04-24_yahoo_50.xlsx']
# file_list = ['~/Desktop/reste_inbox_lv.xlsx']
# file_list = ['~/Desktop/contact_list_2024-09-02_firemail_de_100.xlsx']
# file_list = ['~/Desktop/contact_list_inbox_100_14_04.xlsx']
# file_list = file_list[0:100]
# file_list = ['~/Desktop/contact_list_2026-03-28.xlsx']
send_request_for_file_list(file_list=file_list, thread_number=2,
data_queue_name=MORNING_DATA_CACHE, stop_at_hour=19, stop_at_mins=50)
+3
View File
@@ -3,3 +3,6 @@ curl_cffi==0.7.1
openpyxl openpyxl
pika pika
schedule schedule
python-dotenv
PySocks
imapclient
+41 -13
View File
@@ -10,21 +10,36 @@ from db.mongo_manager import MONGO_DB_URL
MONGO_HOST = "mongo.lpaconsulting.fr" MONGO_HOST = "mongo.lpaconsulting.fr"
MONGO_PORT = "27017" MONGO_PORT = "27017"
MONGO_DB_NAME = "appointment" # 你要备份/恢复的数据库名 MONGO_DB_NAME = "appointment" # 你要备份/恢复的数据库名
MONGO_USER = "appointment" # 如果没有密码,保持为空字符串 ""
MONGO_PASS = "Rdv@2022" # 如果没有密码,保持为空字符串 "" # Get MongoDB credentials from environment variables
MONGO_USER = os.getenv(
"MONGO_USER", "appointment"
) # Default to 'appointment' if not set
MONGO_PASS = os.getenv("MONGO_PASS", "Rdv@2022") # Default to 'Rdv@2022' if not set
# 备份存放的根目录 # 备份存放的根目录
BACKUP_DIR_ROOT = "./mongo_backups" BACKUP_DIR_ROOT = "./mongo_backups"
# =========================================== # ===========================================
def get_auth_args(): def get_auth_args():
"""构建认证参数列表""" """构建认证参数列表"""
args = [] args = []
if MONGO_USER and MONGO_PASS: if MONGO_USER and MONGO_PASS:
args.extend(["--username", MONGO_USER, "--password", MONGO_PASS, "--authenticationDatabase", "appointment"]) args.extend(
[
"--username",
MONGO_USER,
"--password",
MONGO_PASS,
"--authenticationDatabase",
"appointment",
]
)
return args return args
def backup_mongo(): def backup_mongo():
"""执行备份操作""" """执行备份操作"""
# 1. 创建带有时间戳的备份文件夹 # 1. 创建带有时间戳的备份文件夹
@@ -40,10 +55,14 @@ def backup_mongo():
# 命令格式: mongodump --host <host> --port <port> --db <db> --out <path> [auth] # 命令格式: mongodump --host <host> --port <port> --db <db> --out <path> [auth]
cmd = [ cmd = [
"mongodump", "mongodump",
"--host", MONGO_HOST, "--host",
"--port", MONGO_PORT, MONGO_HOST,
"--db", MONGO_DB_NAME, "--port",
"--out", backup_path MONGO_PORT,
"--db",
MONGO_DB_NAME,
"--out",
backup_path,
] ]
# 添加认证参数 # 添加认证参数
@@ -61,6 +80,7 @@ def backup_mongo():
print(f" 错误信息: {e.stderr}") print(f" 错误信息: {e.stderr}")
return None return None
def restore_mongo(backup_source_path): def restore_mongo(backup_source_path):
""" """
执行恢复操作 执行恢复操作
@@ -72,7 +92,9 @@ def restore_mongo(backup_source_path):
target_dir = os.path.join(backup_source_path, MONGO_DB_NAME) target_dir = os.path.join(backup_source_path, MONGO_DB_NAME)
if not os.path.exists(target_dir): if not os.path.exists(target_dir):
print(f"[-] 错误: 在路径 {backup_source_path} 下找不到数据库 {MONGO_DB_NAME} 的备份文件。") print(
f"[-] 错误: 在路径 {backup_source_path} 下找不到数据库 {MONGO_DB_NAME} 的备份文件。"
)
return return
print(f"[*] 开始恢复数据库: {MONGO_DB_NAME}{target_dir} ...") print(f"[*] 开始恢复数据库: {MONGO_DB_NAME}{target_dir} ...")
@@ -81,11 +103,14 @@ def restore_mongo(backup_source_path):
# 命令格式: mongorestore --host <host> --port <port> --db <db> <path_to_bson_files> [auth] # 命令格式: mongorestore --host <host> --port <port> --db <db> <path_to_bson_files> [auth]
cmd = [ cmd = [
"mongorestore", "mongorestore",
"--host", MONGO_HOST, "--host",
"--port", MONGO_PORT, MONGO_HOST,
"--db", MONGO_DB_NAME, "--port",
MONGO_PORT,
"--db",
MONGO_DB_NAME,
"--drop", # 警告:这会在恢复前删除现有集合,确保数据干净。根据需要移除此项。 "--drop", # 警告:这会在恢复前删除现有集合,确保数据干净。根据需要移除此项。
target_dir target_dir,
] ]
cmd.extend(get_auth_args()) cmd.extend(get_auth_args())
@@ -98,6 +123,7 @@ def restore_mongo(backup_source_path):
print(f"[-] 恢复失败: {e}") print(f"[-] 恢复失败: {e}")
print(f" 错误信息: {e.stderr}") print(f" 错误信息: {e.stderr}")
# ================= 主程序入口 ================= # ================= 主程序入口 =================
if __name__ == "__main__": if __name__ == "__main__":
print("请选择操作:") print("请选择操作:")
@@ -124,7 +150,9 @@ if __name__ == "__main__":
try: try:
idx_choice = int(input("\n请选择要恢复的备份编号: ")) - 1 idx_choice = int(input("\n请选择要恢复的备份编号: ")) - 1
if 0 <= idx_choice < len(backups): if 0 <= idx_choice < len(backups):
selected_backup = os.path.join(BACKUP_DIR_ROOT, backups[idx_choice]) selected_backup = os.path.join(
BACKUP_DIR_ROOT, backups[idx_choice]
)
restore_mongo(selected_backup) restore_mongo(selected_backup)
else: else:
print("[-] 无效的选择。") print("[-] 无效的选择。")