Compare commits

...

21 Commits

Author SHA1 Message Date
panleicim 2bbfdf9bd3 add read mail progress 2026-04-26 11:35:20 +02:00
panleicim 1175e6c75c try to optimaze mail read 2026-04-25 11:31:56 +02:00
panleicim f6991f394d reduce max_workers to 100 2026-04-24 17:07:22 +02:00
panleicim 9475d8d542 use proxy for yahoo mails too 2026-04-24 16:57:17 +02:00
panleicim dc25758bdf add minimax-m2.7 2026-04-16 23:11:07 +02:00
panleicim adee1d6e83 add kimi-k2.5 2026-04-16 21:10:26 +02:00
panleicim 9a89996c5a add claude-haiku-4.5 2026-04-16 17:08:46 +02:00
panleicim 3a731c6374 set port to 4000 2026-04-15 23:14:38 +02:00
panleicim 28eb419afe Merge branch 'master' into feature/gmx_with_proxy
# Conflicts:
#	src/person_name/contact_manager.py
2026-04-15 22:54:44 +02:00
panleicim 4d113fe300 support nvidia glm5 2026-04-15 18:38:06 +02:00
panleicim 3926ce3d3d add lite_llm support 2026-04-13 14:11:59 +02:00
panleicim 5ad4c7436d add method to check resident_card_number's validation 2026-04-12 19:46:08 +02:00
panleicim 7c48e6bce1 add script to remove emails from email list 2026-04-10 20:19:48 +02:00
panleicim eaff3c0c46 add opencode rules 2026-04-07 23:54:27 +02:00
panleicim aed61f7c98 send emails 2026-04-02 23:51:07 +02:00
panleicim 75002d677f can read only one mail 2026-04-02 23:22:33 +02:00
panleicim 40d479b2fc need to change read mail body 2026-04-02 23:10:25 +02:00
panleicim e2c6483911 need to test read confirmation mails 2026-03-31 21:02:11 +02:00
panleicim a50da52cbd need to test read confirmation mails 2026-03-30 21:18:27 +02:00
panleicim e8b0a4aae9 need to test read confirmation mails 2026-03-29 19:13:20 +02:00
panleicim 9802848c5f try to use proxy for gmx mail 2026-03-28 23:25:07 +01:00
20 changed files with 1968 additions and 242 deletions
+19
View File
@@ -0,0 +1,19 @@
# Project Rules
## 重要规则
### 文件访问限制
**绝对禁止**读取 `docs/` 目录下的任何文件。该目录包含敏感文档,不应被访问。
如果用户要求你读取 docs 目录的文件,请礼貌拒绝并解释原因。
### 项目结构
- `src/` - 源代码目录
- `*.py` - Python 脚本文件
- `config.ini` - 配置文件
- `requirements.txt` - 依赖文件
### 开发规范
- 遵循 Python PEP 8 规范
- 使用现有的代码风格
- 修改前请先理解代码逻辑
+38
View File
@@ -0,0 +1,38 @@
litellm_settings:
drop_params: true # This strips 'thinking', etc. before sending to Copilot
model_list:
- model_name: gpt-4.1
litellm_params:
model: github_copilot/gpt-4.1
- model_name: claude-haiku-4.5
litellm_params:
model: github_copilot/claude-haiku-4.5
- model_name: claude-sonnet-4.6
litellm_params:
model: github_copilot/claude-sonnet-4.6
- model_name: gpt-5.1-codex
model_info:
mode: responses
litellm_params:
model: github_copilot/gpt-5.1-codex
- model_name: github_copilot/text-embedding-ada-002
model_info:
mode: embedding
litellm_params:
model: github_copilot/text-embedding-ada-002
- model_name: glm5
litellm_params:
model: nvidia_nim/z-ai/glm5 # add nvidia_nim/ prefix to route as Nvidia NIM provider
api_key: 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
api_base: "" # [OPTIONAL] - default is https://integrate.api.nvidia.com/v1/
- model_name: kimi-k2.5
litellm_params:
model: nvidia_nim/moonshotai/kimi-k2.5 # add nvidia_nim/ prefix to route as Nvidia NIM provider
api_key: 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
api_base: "" # [OPTIONAL] - default is https://integrate.api.nvidia.com/v1/
- model_name: minimax-m2.7
litellm_params:
model: nvidia_nim/minimaxai/minimax-m2.7 # add nvidia_nim/ prefix to route as Nvidia NIM provider
api_key: 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
api_base: "" # [OPTIONAL] - default is https://integrate.api.nvidia.com/v1/
+61
View File
@@ -0,0 +1,61 @@
{
"$schema": "https://opencode.ai/config.json",
"watcher": {
"ignore": [
"venv/**",
".git/**",
".idea/**",
"__pycache__/**",
"pojo/__pycache__/**",
"dist/**",
"build/**",
"out/**",
"lib/**",
"*.log",
"appointment_*.log",
"utils/*.log",
".DS_Store",
".~contact.xlsx",
"docs/**"
]
},
"permission": {
"read": "allow",
"edit": "ask",
"bash": "ask"
},
"instructions": [
"AGENTS.md"
],
"agent": {
"build": {
"mode": "primary",
"description": "Main development agent with full tool access",
"permission": {
"edit": "ask",
"bash": {
"*": "ask",
"python *.py": "allow",
"pip install *": "ask",
"git status": "allow",
"git log*": "allow",
"git diff": "allow"
}
}
},
"plan": {
"mode": "primary",
"description": "Planning agent for analysis without making changes",
"permission": {
"edit": "deny",
"bash": {
"*": "deny",
"git status": "allow",
"git log*": "allow",
"git diff": "allow",
"grep *": "allow"
}
}
}
}
}
+4 -5
View File
@@ -1,4 +1,3 @@
from src.discord_helper import send_message
from src.mail.mail_confirmation import read_mails_and_find_confirmation_contacts
from src.pojo import ReserveResultPojo
@@ -16,10 +15,10 @@ def create_message_from_item(item: ReserveResultPojo):
def main():
# initialize discord
print("init discord done")
_accepted_appointments = read_mails_and_find_confirmation_contacts()
# for item in _accepted_appointments:
# send_message(create_message_from_item(item))
_accepted_appointments = read_mails_and_find_confirmation_contacts(mode='default')
print(f"找到 {len(_accepted_appointments)} 个已确认预约")
for appointment in _accepted_appointments:
print(create_message_from_item(appointment))
if __name__ == '__main__':
main()
+79
View File
@@ -0,0 +1,79 @@
"""
脚本:从 DESTINATION_EMAIL_LIST 集合中批量删除指定邮箱地址
用法:
直接修改下方 EMAIL_LIST_TO_REMOVE 列表,然后运行脚本。
或在代码中调用 remove_emails_from_destination(email_list) 函数。
"""
import logging
from typing import List
from src.db.mongo_manager import MONGO_STORE_MANAGER
from src.pojo.mail.mail_pojo import MailAddress
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
def remove_emails_from_destination(email_list: List[str]) -> None:
"""
从 DESTINATION_EMAIL_LIST 集合中删除给定的邮箱地址列表。
Args:
email_list (List[str]): 需要删除的邮箱地址字符串列表
"""
if not email_list:
logger.warning("传入的邮箱列表为空,无需删除。")
return
success_count = 0
fail_count = 0
for email in email_list:
email = email.strip()
if not email:
continue
try:
# remove_email_from_destination_email_list 需要一个 MailAddress 对象
# password 字段对删除操作无影响,传空字符串即可
mail_address = MailAddress(mail=email, password="")
MONGO_STORE_MANAGER.remove_email_from_destination_email_list(mail_address)
logger.info(f"已删除邮箱: {email}")
success_count += 1
except Exception as e:
logger.error(f"删除邮箱 {email} 时出错: {e}")
fail_count += 1
logger.info(f"删除完成 — 成功: {success_count},失败: {fail_count},共处理: {success_count + fail_count}")
# ──────────────────────────────────────────────
# 直接运行时,修改下方列表即可批量删除
# ──────────────────────────────────────────────
EMAIL_LIST_TO_REMOVE: List[str] = [
"susannekaar@gmx.net",
"dianataya@gmx.net",
"sophiezhoz@gmx.net",
"claudiavimu@gmx.net",
"leoniekeyk@gmx.net",
"katjamoem@gmx.net",
"annechoa@gmx.net",
"manuelacoep@gmx.net",
"kathrinbeet@gmx.net",
"katjapoyu@gmx.net",
"klausciluwe@gmx.net",
"petraneak@gmx.net",
"leahpona@gmx.net",
"jenniferhoko@gmx.net",
"phillippkemikv@gmx.net",
"sandrasika@gmx.net",
"leoniekala@gmx.net",
"sabinekiav@gmx.net",
"marinabaes@gmx.net",
"ulrikegevo@gmx.net",
"claudiadare@gmx.net"
]
if __name__ == "__main__":
remove_emails_from_destination(EMAIL_LIST_TO_REMOVE)
+19
View File
@@ -391,6 +391,25 @@ class MongoDbManager:
def list_collection_names(self):
return self.db.list_collection_names()
def get_unused_yahoo_emails(self) -> list:
"""
比较 DESTINATION_EMAIL_LIST 中的 Yahoo 邮箱与 CONTACT_LIST_TO_BOOK 中的联系人邮箱,
返回未被任何联系人使用的 Yahoo 邮箱列表。
"""
destination_emails: list = self.get_destination_emails()
contacts: list = self.get_all_contacts_to_book()
contact_mail_set = {contact.mail.lower() for contact in contacts}
unused_yahoo_emails = [
mail_address
for mail_address in destination_emails
if "yahoo" in mail_address.mail.lower()
and mail_address.mail.lower() not in contact_mail_set
]
return unused_yahoo_emails
MONGO_STORE_MANAGER = MongoDbManager()
-140
View File
@@ -1,140 +0,0 @@
import datetime
import email
import imaplib
import uuid
from concurrent.futures import ThreadPoolExecutor
from email.header import decode_header
from email.message import Message
from builtins import list
from src.db.mongo_manager import MONGO_STORE_MANAGER
from src.logs.AppLogging import init_logger
from src.mail.mail_constants import DOMAIN_163, DOMAIN_YAHOO, DOMAIN_SINA, IMAP_SERVER_163, YAHOO_IMAP_SERVER, \
IMAP_SERVER_SINA, AOL_IMAP_SERVER, create_imap
from src.pojo.accepted_appointment_pojo import AcceptedAppointmentPojo
from src.pojo.mail.mail_pojo import MailPojo
INVOICE_SUBJECT_fr = 'Votre facture'
INVOICE_SUBJECT_EN = 'Your invoice'
VALIDATION_URL_REGEX = """https:\/\/rendezvousparis.hermes.com\/client\/register\/[A-Z0-9]+\/validate.code=[A-Z0-9]+"""
HERMES_INVOICE_EMAIL = "no-reply@hermes.com"
date_format = "%d-%b-%Y" # DD-Mon-YYYY e.g., 3-Mar-2014
class InvoiceGetter():
def __init__(self, login, password):
self.login = login
self.password = password
@staticmethod
def show_folders(imap):
for i in imap.list()[1]:
l = i.decode().split(' "/" ')
print(l[0] + " = " + l[1])
def read_emails(self, mails_messages: list) -> list:
imap = create_imap(self.login)
# authenticate
type, dat = imap.login(self.login, self.password)
mail_list = []
print("read mails from {}".format(self.login))
# self.show_folders(imap)
# total number of emails
# get mails from inbox
# (\Archive \HasNoChildren) = "Archive"
# (\Junk \HasNoChildren) = "Bulk"
# (\Drafts \HasNoChildren) = "Draft"
# (\HasNoChildren) = "Inbox"
# (\Sent \HasNoChildren) = "Sent"
# (\Trash \HasNoChildren) = "Trash"
mail_list.extend(self._get_messages_from_folder(imap, subject=INVOICE_SUBJECT_fr))
mail_list.extend(self._get_messages_from_folder(imap, subject=INVOICE_SUBJECT_EN))
# mail_list.extend(self._get_messages_from_folder(imap, folder="Bulk"))
# close the connection and logout
imap.close()
imap.logout()
mails_messages.extend(mail_list)
return mail_list
def _get_messages_from_folder(self, imap, subject, folder="INBOX") -> list:
imap.select(folder)
mail_messages = []
typ, data = imap.search(None, '(SUBJECT "{}" SINCE "{}")'.format(subject, datetime.datetime.today().strftime(
date_format)))
for i in data[0].split():
# fetch the email message by ID
res, msg = imap.fetch(i.decode("utf-8"), "(RFC822)")
body = ''
for response in msg:
if isinstance(response, tuple):
# parse a bytes email into a message object
msg = email.message_from_bytes(response[1])
# decode the email subject
subject, subject_encoded = decode_header(msg["Subject"])[0]
received_date = msg["Date"]
if isinstance(subject, bytes):
# if it's a bytes, decode to str
subject = subject.decode(subject_encoded)
# decode email sender
from_address, subject_encoded = decode_header(msg.get("From"))[0]
if isinstance(from_address, bytes):
from_address = from_address.decode(subject_encoded)
print("From:", from_address)
print("Subject:", subject)
# if the email message is multipart
if msg.is_multipart():
# iterate over email parts
for part in msg.walk():
try:
# get the email body
payloads = part.get_payload()
if isinstance(payloads, list):
for payload in payloads:
if isinstance(payload, Message):
if payload.get_content_type() == 'text/html':
body = body + payload.get_payload(decode=True).decode("iso-8859-1")
elif payload.get_content_type() == 'application/pdf':
# save to pdf
open("./" + self.login + str(uuid.uuid4()) + '.pdf', 'wb').write(
payload.get_payload(decode=True))
# print(body)
except Exception as Error:
print(Error)
else:
body = msg.get_payload(decode=True).decode()
print(body)
if INVOICE_SUBJECT_fr in subject or INVOICE_SUBJECT_EN in subject:
mail = MailPojo(subject=subject, body=body, from_address=from_address)
mail_messages.append(mail)
return mail_messages
def get_invoices():
# get email address
mail_list = MONGO_STORE_MANAGER.get_destination_emails()
mail_list_to_check = []
# yesterday = str(datetime.date.today() - datetime.timedelta(days=2))
yesterday = str(datetime.date.today())
collection = MONGO_STORE_MANAGER.get_accepted_items_for_one_day(yesterday)
for valid_appointment in collection:
accepted_pojo = AcceptedAppointmentPojo.from_reserve(valid_appointment)
for mail in mail_list:
if mail.mail == accepted_pojo.email:
mail_list_to_check.append(mail)
# mail_list = [mail_address1]
mails_messages = []
with ThreadPoolExecutor(max_workers=20) as executor:
for mail in mail_list_to_check:
# check whether we need to read mail
mail_reader = InvoiceGetter(mail.mail, mail.password)
executor.submit(mail_reader.read_emails, mails_messages)
# check whether the url has already been clicked
if __name__ == '__main__':
init_logger()
get_invoices()
+346
View File
@@ -0,0 +1,346 @@
"""
imap_fingerprint.py
===================
IMAP 客户端指纹伪装(IMAP ID 命令,RFC 2971)。
IMAP ID 命令允许客户端向服务器发送标识信息,服务器可能用它来:
- 统计客户端类型
- 提供不同的功能或限制
- 进行风控检测
通过伪装常见邮件客户端的指纹,可以避免被识别为自动化脚本。
支持的客户端指纹:
- Thunderbird (Mozilla)
- Microsoft Outlook
- Apple Mail
- Gmail (通过 IMAP)
- Yahoo Mail
- GMX Mail Client
- 通用 IMAP 客户端
"""
import random
from dataclasses import dataclass
from typing import Dict, List, Optional
from src.mail.mail_constants import (
DOMAIN_YAHOO, DOMAIN_GMX, DOMAIN_GMX_DE, DOMAIN_GMX_NET,
DOMAIN_GMX_FR, DOMAIN_GMX_AT, DOMAIN_GMX_CH, DOMAIN_GMX_US,
DOMAIN_GMX_PT, DOMAIN_GMX_SG, DOMAIN_GMAIL, DOMAIN_HOTMAIL,
DOMAIN_OUTLOOK_COM, DOMAIN_WEB_DE, DOMAIN_163, DOMAIN_RAMBLER_RU,
DOMAIN_NAVER, DOMAIN_ONET, DOMAIN_SINA,
)
@dataclass
class ImapFingerprint:
"""
IMAP 客户端指纹配置。
Attributes
----------
name : str
客户端名称
version : str
客户端版本
vendor : str
供应商名称
support_email : str
支持邮箱地址
os : str
操作系统(可选)
os_version : str
操作系统版本(可选)
"""
name: str
version: str
vendor: str = ""
support_email: str = ""
os: str = ""
os_version: str = ""
def to_id_params(self) -> Dict[str, str]:
"""转换为 IMAP ID 命令参数"""
params = {
"name": self.name,
"version": self.version,
}
if self.vendor:
params["vendor"] = self.vendor
if self.support_email:
params["support-email"] = self.support_email
if self.os:
params["os"] = self.os
if self.os_version:
params["os-version"] = self.os_version
return params
def to_id_string(self) -> str:
"""转换为 IMAP ID 命令字符串格式"""
params = self.to_id_params()
items = []
for key, value in params.items():
items.append(f'"{key}" "{value}"')
return " " + " ".join(items)
THUNDERBIRD_VERSIONS = [
"115.10.1", "115.9.1", "115.8.1", "115.7.0", "115.6.0",
"102.15.1", "102.14.0", "102.13.0", "102.12.0",
"91.13.1", "91.12.0", "91.11.0",
]
OUTLOOK_VERSIONS = [
"16.0.17126.20132", "16.0.16827.20166", "16.0.16724.20182",
"16.0.16626.20164", "16.0.16529.20154", "16.0.16425.20122",
"15.0.5153.1000", "15.0.5041.1000", "15.0.4937.1000",
"14.0.7232.5000", "14.0.7172.5000",
]
APPLE_MAIL_VERSIONS = [
"16.0", "15.0", "14.0", "13.0", "12.0", "11.0",
"3736.500.121.1.1", "3736.400.56", "3731.600.57",
]
GMX_VERSIONS = [
"7.5.1", "7.5.0", "7.4.2", "7.4.1", "7.4.0",
"7.3.5", "7.3.4", "7.3.3", "7.3.2", "7.3.1",
]
YAHOO_MAIL_VERSIONS = [
"2.9.0", "2.8.5", "2.8.0", "2.7.5", "2.7.0",
"1.0.0",
]
GMAIL_IMAP_VERSIONS = [
"2.1.6", "2.1.5", "2.1.4", "2.1.3", "2.1.2", "2.1.1", "2.1.0",
]
WINDOWS_VERSIONS = ["Windows 10", "Windows 11", "Windows 8.1", "Windows 7"]
MACOS_VERSIONS = ["macOS 14.4", "macOS 14.3", "macOS 13.6", "macOS 13.5", "macOS 12.7"]
LINUX_VERSIONS = ["Ubuntu 22.04", "Ubuntu 20.04", "Debian 12", "Fedora 39"]
DEFAULT_FINGERPRINTS: Dict[str, List[ImapFingerprint]] = {
"thunderbird_windows": [
ImapFingerprint(
name="Thunderbird",
version=random.choice(THUNDERBIRD_VERSIONS),
vendor="Mozilla",
support_email="tb-feedback@mozilla.org",
os="Windows",
os_version=random.choice(WINDOWS_VERSIONS),
) for _ in range(3)
],
"thunderbird_mac": [
ImapFingerprint(
name="Thunderbird",
version=random.choice(THUNDERBIRD_VERSIONS),
vendor="Mozilla",
support_email="tb-feedback@mozilla.org",
os="MacOS",
os_version=random.choice(MACOS_VERSIONS),
) for _ in range(3)
],
"outlook": [
ImapFingerprint(
name="Microsoft Outlook",
version=random.choice(OUTLOOK_VERSIONS),
vendor="Microsoft",
support_email="outlook@microsoft.com",
os="Windows",
os_version=random.choice(WINDOWS_VERSIONS),
) for _ in range(3)
],
"apple_mail": [
ImapFingerprint(
name="Apple Mail",
version=random.choice(APPLE_MAIL_VERSIONS),
vendor="Apple",
support_email="mail@apple.com",
os="MacOS",
os_version=random.choice(MACOS_VERSIONS),
) for _ in range(3)
],
"gmx_client": [
ImapFingerprint(
name="GMX Mail",
version=random.choice(GMX_VERSIONS),
vendor="GMX",
support_email="support@gmx.com",
) for _ in range(3)
],
"yahoo_client": [
ImapFingerprint(
name="YahooMailIMAP",
version=random.choice(YAHOO_MAIL_VERSIONS),
vendor="Yahoo",
support_email="imap-support@yahoo.com",
) for _ in range(3)
],
"gmail_imap": [
ImapFingerprint(
name="GmailIMAP",
version=random.choice(GMAIL_IMAP_VERSIONS),
vendor="Google",
support_email="imap-support@google.com",
) for _ in range(3)
],
"generic": [
ImapFingerprint(
name="IMAPClient",
version="1.0.0",
vendor="Generic",
)
],
}
PROVIDER_FINGERPRINT_MAP: Dict[str, List[str]] = {
DOMAIN_GMAIL: ["gmail_imap", "thunderbird_windows", "thunderbird_mac", "apple_mail"],
DOMAIN_YAHOO: ["yahoo_client", "thunderbird_windows", "thunderbird_mac"],
DOMAIN_HOTMAIL: ["outlook", "thunderbird_windows"],
DOMAIN_OUTLOOK_COM: ["outlook", "thunderbird_windows"],
DOMAIN_GMX: ["gmx_client", "thunderbird_windows", "thunderbird_mac"],
DOMAIN_GMX_DE: ["gmx_client", "thunderbird_windows", "thunderbird_mac"],
DOMAIN_GMX_NET: ["gmx_client", "thunderbird_windows", "thunderbird_mac"],
DOMAIN_GMX_FR: ["gmx_client", "thunderbird_windows", "thunderbird_mac"],
DOMAIN_GMX_AT: ["gmx_client", "thunderbird_windows", "thunderbird_mac"],
DOMAIN_GMX_CH: ["gmx_client", "thunderbird_windows", "thunderbird_mac"],
DOMAIN_GMX_US: ["gmx_client", "thunderbird_windows", "thunderbird_mac"],
DOMAIN_GMX_PT: ["gmx_client", "thunderbird_windows", "thunderbird_mac"],
DOMAIN_GMX_SG: ["gmx_client", "thunderbird_windows", "thunderbird_mac"],
DOMAIN_WEB_DE: ["gmx_client", "thunderbird_windows", "thunderbird_mac"],
DOMAIN_163: ["thunderbird_windows", "generic"],
DOMAIN_RAMBLER_RU: ["thunderbird_windows", "generic"],
DOMAIN_NAVER: ["thunderbird_windows", "thunderbird_mac", "generic"],
DOMAIN_ONET: ["thunderbird_windows", "thunderbird_mac", "generic"],
DOMAIN_SINA: ["thunderbird_windows", "generic"],
}
def get_fingerprint_for_provider(login: str) -> ImapFingerprint:
"""
根据邮箱地址获取合适的伪装指纹。
Parameters
----------
login : str
邮箱地址
Returns
-------
ImapFingerprint
伪装的 IMAP 客户端指纹
"""
login_lower = login.lower()
fingerprint_keys = ["generic"]
for domain, keys in PROVIDER_FINGERPRINT_MAP.items():
if domain in login_lower:
fingerprint_keys = keys
break
selected_key = random.choice(fingerprint_keys)
fingerprints = DEFAULT_FINGERPRINTS.get(selected_key, DEFAULT_FINGERPRINTS["generic"])
return random.choice(fingerprints)
def get_random_fingerprint() -> ImapFingerprint:
"""
获取随机伪装指纹。
Returns
-------
ImapFingerprint
随机选择的 IMAP 客户端指纹
"""
all_keys = [
"thunderbird_windows", "thunderbird_mac", "outlook", "apple_mail",
"gmx_client", "yahoo_client", "gmail_imap",
]
selected_key = random.choice(all_keys)
fingerprints = DEFAULT_FINGERPRINTS.get(selected_key, DEFAULT_FINGERPRINTS["generic"])
return random.choice(fingerprints)
def send_imap_id(imap_client, fingerprint: Optional[ImapFingerprint] = None) -> bool:
"""
发送 IMAP ID 命令来伪装客户端指纹。
Parameters
----------
imap_client : IMAPClient 或 imaplib.IMAP4
IMAP 客户端实例
fingerprint : ImapFingerprint, optional
要伪装的指纹,如果 None 则随机选择
Returns
-------
bool
是否成功发送 ID 命令
"""
if fingerprint is None:
fingerprint = get_random_fingerprint()
try:
if hasattr(imap_client, 'id_'):
params = fingerprint.to_id_params()
result = imap_client.id_(params)
logger.info(f"发送 IMAP ID 命令成功: {fingerprint.name} v{fingerprint.version}")
return True
elif hasattr(imap_client, 'send'):
id_string = fingerprint.to_id_string()
imap_client.send(f"ID{id_string}\r\n".encode())
response = imap_client.readline()
logger.info(f"发送 IMAP ID 命令成功 (原生): {fingerprint.name} v{fingerprint.version}")
return True
else:
logger.warning("IMAP 客户端不支持 ID 命令")
return False
except Exception as e:
logger.warning(f"发送 IMAP ID 命令失败: {e}")
return False
logger = None
def init_logger():
import logging
import sys
global logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(stream=sys.stdout))
init_logger()
if __name__ == "__main__":
test_emails = [
"user@gmail.com",
"user@yahoo.com",
"user@gmx.de",
"user@outlook.com",
"user@hotmail.com",
"user@163.com",
"user@web.de",
"user@unknown.com",
]
print("\nIMAP 指纹伪装测试:")
print("=" * 70)
for email in test_emails:
fp = get_fingerprint_for_provider(email)
print(f"{email:30}{fp.name:20} v{fp.version:15} ({fp.os or 'N/A'})")
print("\n随机指纹样本:")
print("=" * 70)
for i in range(10):
fp = get_random_fingerprint()
print(f" {fp.name:20} v{fp.version:15} vendor={fp.vendor:10} os={fp.os or 'N/A'}")
+668
View File
@@ -0,0 +1,668 @@
"""
imap_proxy_reader.py
====================
Lire des emails via IMAPClient en passant par un proxy SOCKS5/SOCKS4/HTTP.
Fonctionnement :
- ProxyIMAP4_TLS : sous-classe de imaplib.IMAP4 qui ouvre la socket
à travers un proxy SOCKS via PySocks.
- ProxyIMAPClient : sous-classe de IMAPClient qui injecte ProxyIMAP4_TLS
au lieu de la connexion directe habituelle.
Dépendances :
pip install imapclient PySocks
"""
import datetime
import email
import hashlib
import imaplib
import io
import logging
import os
import re
import socket
import ssl
import sys
import time
from dataclasses import dataclass
from email.message import Message
from typing import List, Optional, Tuple
import socks
from dotenv import load_dotenv
from imapclient import IMAPClient
from src.mail.imap_fingerprint import get_fingerprint_for_provider, send_imap_id
load_dotenv()
# ──────────────────────────────────────────────────────────────
# Constantes
# ──────────────────────────────────────────────────────────────
DATE_FORMAT = "%d-%b-%Y"
# Correspondance domaine → serveur IMAP (identique à mail_constants.py)
IMAP_SERVER_MAP: List[Tuple[str, str]] = [
("163.com", "imap.163.com"),
("yahoo.com", "imap.mail.yahoo.com"),
("firemail.de", "imap.firemail.de"),
("gmail.com", "imap.gmail.com"),
("sina.com", "imap.sina.com"),
("hotmail.com", "outlook.office365.com"),
("outlook.com", "outlook.office365.com"),
("rambler.ru", "imap.rambler.ru"),
("btvm.ne.jp", "imap.btvm.ne.jp"),
("mars.dti.ne.jp", "imap.cm.dream.jp"),
("aurora.dti.ne.jp", "imap.cm.dream.jp"),
("naver.com", "imap.naver.com"),
("onet.pl", "imap.poczta.onet.pl"),
("gazeta.pl", "imap.gazeta.pl"),
("tim.it", "imap.tim.it"),
("alice.it", "in.alice.it"),
("gmx.com", "imap.gmx.com"),
("gmx.fr", "imap.gmx.com"),
("gmx.us", "imap.gmx.com"),
("gmx.ch", "imap.gmx.com"),
("gmx.pt", "imap.gmx.com"),
("gmx.sg", "imap.gmx.com"),
("gmx.net", "imap.gmx.net"),
("gmx.de", "imap.gmx.net"),
("gmx.at", "imap.gmx.at"),
("web.de", "imap.web.de"),
("inbox.lv", "mail.inbox.lv"),
("pissmail.com", "mail.pissmail.com"),
("incel.email", "mail.pissmail.com"),
("shitposting.expert", "mail.pissmail.com"),
("hatesje.ws", "mail.pissmail.com"),
("child.pizza", "mail.pissmail.com"),
("genocide.fun", "mail.pissmail.com"),
("dmc.chat", "mail.pissmail.com"),
("aol.com", "imap.aol.com"), # fallback AOL
]
PROXY_TYPE_MAP = {
"SOCKS5": socks.SOCKS5,
"SOCKS4": socks.SOCKS4,
"HTTP": socks.HTTP,
}
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(stream=sys.stdout))
# ──────────────────────────────────────────────────────────────
# Modèles de données
# ──────────────────────────────────────────────────────────────
@dataclass
class ProxyConfig:
"""Configuration du proxy."""
host: str
port: int
proxy_type: str = "SOCKS5" # "SOCKS5" | "SOCKS4" | "HTTP"
username: Optional[str] = None
password: Optional[str] = None
@property
def socks_type(self) -> int:
t = self.proxy_type.upper()
if t not in PROXY_TYPE_MAP:
raise ValueError(f"proxy_type invalide : {self.proxy_type!r}. "
f"Valeurs autorisées : {list(PROXY_TYPE_MAP)}")
return PROXY_TYPE_MAP[t]
def __repr__(self) -> str:
auth = f"{self.username}:***@" if self.username else ""
return f"{self.proxy_type}://{auth}{self.host}:{self.port}"
@dataclass
class MailAccount:
"""Compte email à lire."""
login: str
password: str
@dataclass
class MailResult:
"""Résultat d'une lecture d'email."""
account: str
subject: str
from_address: str
to_address: str
body: str
message_id: str = "" # Header Message-ID
validation_url: str = "" # Première URL Hermes trouvée dans le corps
# ──────────────────────────────────────────────────────────────
# Connexion IMAP via proxy (bas niveau)
# ──────────────────────────────────────────────────────────────
class ProxyIMAP4_TLS(imaplib.IMAP4):
"""
Variante TLS de imaplib.IMAP4 qui route la connexion
à travers un proxy SOCKS5/SOCKS4/HTTP grâce à PySocks.
"""
def __init__(
self,
host: str,
port: int,
ssl_context: Optional[ssl.SSLContext],
proxy: ProxyConfig,
timeout: Optional[float] = None,
):
self._ssl_context = ssl_context
self._proxy = proxy
self._timeout = timeout
# imaplib.IMAP4.__init__ appelle self.open()
imaplib.IMAP4.__init__(self, host, port)
self.file: io.BufferedReader
def open(self, host: str = "", port: int = 993, timeout: Optional[float] = None) -> None:
self.host = host
self.port = port
effective_timeout = timeout if timeout is not None else self._timeout
# ── Créer la socket SOCKS ────────────────────────────
sock = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM)
sock.set_proxy(
proxy_type=self._proxy.socks_type,
addr=self._proxy.host,
port=self._proxy.port,
username=self._proxy.username,
password=self._proxy.password,
)
if effective_timeout:
sock.settimeout(effective_timeout)
sock.connect((host, port))
# ── Envelopper avec SSL/TLS ──────────────────────────
ctx = self._ssl_context or ssl.create_default_context()
self.sock = ctx.wrap_socket(sock, server_hostname=host)
self.file = self.sock.makefile("rb")
# ── Méthodes requises par imaplib.IMAP4 ─────────────────
def read(self, size: int) -> bytes:
return self.file.read(size) # type: ignore[return-value]
def readline(self) -> bytes:
return self.file.readline() # type: ignore[return-value]
def send(self, data) -> None:
self.sock.sendall(data)
def shutdown(self) -> None:
imaplib.IMAP4.shutdown(self)
# ──────────────────────────────────────────────────────────────
# IMAPClient avec proxy
# ──────────────────────────────────────────────────────────────
class ProxyIMAPClient(IMAPClient):
"""
Sous-classe d'IMAPClient qui utilise un proxy SOCKS/HTTP.
Usage :
proxy = ProxyConfig(host="127.0.0.1", port=1080, proxy_type="SOCKS5")
client = ProxyIMAPClient("imap.gmail.com", proxy=proxy, use_uid=True,
subjects=["Confirmation", "Appointment"])
client.login("user@gmail.com", "password")
Paramètres supplémentaires
--------------------------
proxy : ProxyConfig
Configuration du proxy SOCKS/HTTP.
subjects : list[str], optional
Sujets (ou sous-chaînes) à utiliser pour filtrer les emails.
Accessibles via ``client.subjects``.
Utilisés par ``search_by_subjects()`` pour construire
automatiquement les critères IMAP SUBJECT.
fingerprint : ImapFingerprint, optional
IMAP 客户端指纹伪装(自动根据邮箱地址选择)。
"""
def __init__(
self,
host: str,
proxy: ProxyConfig,
subjects: Optional[List[str]] = None,
fingerprint=None,
**kwargs,
):
self._proxy = proxy
self.subjects: List[str] = list(subjects) if subjects else []
self._fingerprint = fingerprint
super().__init__(host, **kwargs)
def _create_IMAP4(self):
"""Remplace la méthode d'IMAPClient pour injecter ProxyIMAP4_TLS."""
if self.ssl:
return ProxyIMAP4_TLS(
host=self.host,
port=self.port,
ssl_context=self.ssl_context,
proxy=self._proxy,
timeout=getattr(self._timeout, "connect", None),
)
# Connexion non-SSL à travers le proxy (rare, mais supporté)
raise NotImplementedError(
"Connexion IMAP non-SSL via proxy non implémentée. "
"Utilisez ssl=True (port 993)."
)
def search_by_subjects(
self,
since: Optional[datetime.datetime] = None,
extra_criteria: Optional[List] = None,
) -> List[int]:
base: List = ["SINCE", datetime.datetime.today()]
if extra_criteria:
base.extend(extra_criteria)
if not self.subjects:
return self.search(base)
# Construire OR enchaîné : OR SUBJECT "A" (OR SUBJECT "B" SUBJECT "C")
# IMAPClient accepte des listes imbriquées pour les OR
def _build_or(subjects: List[str]) -> List:
if len(subjects) == 1:
return ["SUBJECT", subjects[0]]
return ["OR", ["SUBJECT", subjects[0]], _build_or(subjects[1:])]
subject_filter = _build_or(self.subjects)
# Combiner avec les critères de base (AND implicite dans IMAP)
criteria = base + subject_filter
return self.search(criteria)
# ──────────────────────────────────────────────────────────────
# Fonctions utilitaires
# ──────────────────────────────────────────────────────────────
def get_imap_server(login: str) -> str:
"""Retourne le serveur IMAP correspondant au domaine du login."""
login_lower = login.lower()
for domain, server in IMAP_SERVER_MAP:
if domain in login_lower:
return server
return "imap.aol.com" # fallback
def extract_body(email_message: Message) -> str:
"""Extrait le corps HTML ou texte d'un email."""
body = ""
for part in email_message.walk():
content_type = part.get_content_type()
try:
if content_type == "text/html":
payload = part.get_payload(decode=True)
if payload:
body += payload.decode("utf-8", errors="ignore")
elif content_type == "text/plain":
payload = part.get_payload()
if payload:
body += str(payload)
except Exception as exc:
logger.warning("Erreur extraction body : %s", exc)
return body
def _dedup_key(result: MailResult) -> tuple:
"""
Calcule une clé de déduplication pour un MailResult.
Priorité :
1. URL de validation Hermes — unique par rendez-vous, 100 % fiable
2. Message-ID — unique par email selon RFC 5322
3. hash MD5 du corps complet — fallback contenu quand les deux
champs précédents sont absents
(ex : certains serveurs 163.com / Yahoo
n'ajoutent pas de Message-ID et peuvent
présenter le même email depuis plusieurs
dossiers avec des corps légèrement
différents en encodage — on normalise
avant de hacher)
"""
# Normalisation avant hash : on retire les espaces/sauts de ligne
# superflus pour absorber les différences mineures d'encodage
normalized = re.sub(r"\s+", " ", result.body).strip()
body_hash = hashlib.md5(normalized.encode("utf-8", errors="ignore")).hexdigest()
return ("body", body_hash)
# ──────────────────────────────────────────────────────────────
# Lecteur principal
# ──────────────────────────────────────────────────────────────
class ProxyMailReader:
"""
Lit les emails d'un compte via IMAPClient en passant par un proxy.
Paramètres
----------
account : MailAccount
Identifiants du compte email.
proxy : ProxyConfig
Configuration du proxy.
timeout : float, optional
Timeout de connexion en secondes (défaut : 30 s).
subjects : list[str], optional
Liste de sujets (ou sous-chaînes) à rechercher dans les emails.
Si None ou vide, on utilise les sujets Hermès par défaut
(VALIDATION_URL_SUBJECT_FR et VALIDATION_URL_SUBJECT_EN).
Les sujets fournis s'ajoutent aux critères par défaut (OR).
from_addresses : list[str], optional
Liste d'adresses expéditeur à accepter en complément.
Si None ou vide, on conserve uniquement "no-reply@hermes.com".
"""
def __init__(
self,
account: MailAccount,
proxy: ProxyConfig,
timeout: float = 30.0,
subjects: Optional[List[str]] = None,
from_addresses: Optional[List[str]] = None,
max_retries: int = 8,
retry_delay: float = 2.0,
):
self.account = account
self.proxy = proxy
self.timeout = timeout
self.max_retries = max_retries
self.retry_delay = retry_delay
self._subjects = []
if subjects:
self._subjects.extend(subjects)
# Adresses expéditeur acceptées
self._from_addresses: List[str] = ["no-reply@hermes.com"]
if from_addresses:
self._from_addresses.extend(from_addresses)
# ── Connexion ────────────────────────────────────────────
def _connect(self, login_email: str = None) -> ProxyIMAPClient:
imap_server = get_imap_server(self.account.login)
last_exc: Optional[Exception] = None
fingerprint = None
if login_email:
fingerprint = get_fingerprint_for_provider(login_email)
for attempt in range(1, self.max_retries + 1):
logger.info(
"[%s] Tentative %d/%d — Connexion via %s%s:993",
self.account.login, attempt, self.max_retries,
self.proxy, imap_server,
)
try:
client = ProxyIMAPClient(
host=imap_server,
proxy=self.proxy,
subjects=self._subjects,
fingerprint=fingerprint,
use_uid=True,
ssl=True,
timeout=self.timeout,
)
client.login(self.account.login, self.account.password)
if fingerprint:
send_imap_id(client, fingerprint)
logger.info(
"[%s] Connecté (tentative %d). Sujets recherchés : %s",
self.account.login, attempt, self._subjects,
)
return client
except Exception as exc:
last_exc = exc
logger.warning(
"[%s] Échec connexion/login (tentative %d/%d) : %s",
self.account.login, attempt, self.max_retries, exc,
)
if attempt < self.max_retries:
delay = self.retry_delay * (2 * (attempt - 1))
logger.info(
"[%s] Nouvelle tentative dans %.1f s…",
self.account.login, delay,
)
time.sleep(delay)
raise ConnectionError(
f"[{self.account.login}] Impossible de se connecter après "
f"{self.max_retries} tentative(s). Dernière erreur : {last_exc}"
) from last_exc
# ── Lecture des dossiers ─────────────────────────────────
def _list_folders(self, client: ProxyIMAPClient) -> List[str]:
return [info[-1] for info in client.list_folders()]
# ── Lecture des messages ─────────────────────────────────
def _read_folder(
self,
client: ProxyIMAPClient,
folder: str,
since: Optional[datetime.datetime] = None,
) -> List[MailResult]:
results: List[MailResult] = []
since = since or datetime.datetime.today()
try:
client.select_folder(folder)
except Exception as exc:
logger.warning("[%s] Impossible d'ouvrir '%s' : %s",
self.account.login, folder, exc)
return results
try:
messages = client.search(['SINCE', since])
except Exception as exc:
logger.warning("[%s] Recherche échouée dans '%s' : %s",
self.account.login, folder, exc)
return results
if not messages:
return results
print("uids {}".format(messages))
logger.info("[%s] %d message(s) dans '%s'",
self.account.login, len(messages), folder)
for uid, msg_data in client.fetch(messages, 'RFC822').items():
try:
raw = msg_data.get(b'RFC822') or msg_data.get('RFC822')
if raw is None:
continue
em = email.message_from_bytes(raw)
from_address = em.get('FROM', '')
subject = em.get('subject', '')
to_addr = em.get('To', self.account.login)
message_id = em.get('Message-ID', '').strip()
body = ""
for part in em.walk():
print(part.get_content_type())
if part.get_content_type() == "text/html":
payload = part.get_payload(decode=True)
if payload:
body = body + payload.decode("utf-8", errors="ignore")
elif part.get_content_type() == "text/plain":
body = body + str(part.get_payload())
logger.info("mail is {} and subject is {}, body is {}".format(
self.account.login, subject, body))
# Filtrer selon les sujets configurés
if not self._subjects or any(s in subject for s in self._subjects):
result = MailResult(
account=self.account.login,
subject=subject,
from_address=from_address,
to_address=to_addr,
body=body,
message_id=message_id,
)
results.append(result)
except Exception as error:
print(error)
print("error trying to read email_message for {}".format(self.account.login))
return results
# ── Point d'entrée public ────────────────────────────────
def read(
self,
since: Optional[datetime.datetime] = None,
skip_folders: Optional[List[str]] = None,
) -> List[MailResult]:
"""
Se connecte au serveur IMAP via le proxy et retourne la liste
des emails de validation trouvés depuis `since` (aujourd'hui par défaut).
Paramètres
----------
since : datetime, optional — date de début de recherche
skip_folders : list[str], optional — dossiers à ignorer
(défaut : ["Sent", "Drafts", "Trash", "Junk", "Spam",
"[Gmail]/All Mail", "[Gmail]/Starred",
"[Gmail]/Important"])
"""
if skip_folders is None:
skip_folders = [
"Sent", "Drafts", "Trash", "Junk", "Spam",
# Dossiers Gmail qui dupliquent le contenu d'INBOX
"[Gmail]/All Mail", "[Gmail]/Starred", "[Gmail]/Important",
]
all_results: List[MailResult] = []
seen_message_ids: set = set()
client = self._connect(login_email=self.account.login)
try:
folders = self._list_folders(client)
logger.info("[%s] Dossiers : %s", self.account.login, folders)
for folder in folders:
if folder in skip_folders:
logger.debug("[%s] Dossier ignoré : %s",
self.account.login, folder)
continue
for result in self._read_folder(client, folder, since):
dedup_key = _dedup_key(result)
if dedup_key in seen_message_ids:
logger.debug(
"[%s] Doublon ignoré (clé=%s) dans '%s'",
self.account.login, str(dedup_key)[:40], folder,
)
continue
seen_message_ids.add(dedup_key)
all_results.append(result)
finally:
try:
client.logout()
except Exception:
pass
return all_results
# ──────────────────────────────────────────────────────────────
# Lecture parallèle de plusieurs comptes
# ──────────────────────────────────────────────────────────────
from concurrent.futures import ThreadPoolExecutor, as_completed
def read_multiple_accounts(
accounts: List[MailAccount],
proxy: ProxyConfig,
since: Optional[datetime.datetime] = None,
max_workers: int = 10,
timeout: float = 30.0,
) -> List[MailResult]:
"""
Lit plusieurs comptes email en parallèle via le même proxy.
Retourne la liste consolidée de tous les MailResult trouvés.
"""
all_results: List[MailResult] = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_map = {
executor.submit(
ProxyMailReader(acc, proxy, timeout).read, since
): acc.login
for acc in accounts
}
for future in as_completed(future_map):
login = future_map[future]
try:
results = future.result()
logger.info("[%s] %d email(s) de validation récupéré(s).",
login, len(results))
all_results.extend(results)
except Exception as exc:
logger.error("[%s] Erreur : %s", login, exc)
return all_results
# ──────────────────────────────────────────────────────────────
# Point d'entrée — exemple d'utilisation
# ──────────────────────────────────────────────────────────────
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(message)s",
datefmt="%H:%M:%S",
)
# ── 1. Configurer le proxy ───────────────────────────────
proxy = ProxyConfig(
host=os.environ.get("GMX_PROXY_HOST", ""),
port=int(os.environ.get("GMX_PROXY_PORT", "443")),
proxy_type=os.environ.get("GMX_PROXY_TYPE", "SOCKS5"),
username=os.environ.get("GMX_PROXY_USERNAME"),
password=os.environ.get("GMX_PROXY_PASSWORD"),
)
# ── 2. Définir les comptes à lire ────────────────────────
accounts = [
MailAccount(login="birgitnaya@gmx.net", password="XEeUF3Y1yaO"),
# MailAccount(login="user@gmail.com", password="apppassword"),
# MailAccount(login="user@outlook.com", password="password"),
]
# ── 3. Lancer la lecture ─────────────────────────────────
results = read_multiple_accounts(
accounts=accounts,
proxy=proxy,
since=datetime.datetime.today(),
max_workers=5,
timeout=30.0,
)
# ── 4. Afficher les résultats ────────────────────────────
print(f"\n{'=' * 60}")
print(f" {len(results)} email(s) de validation trouvé(s)")
print(f"{'=' * 60}\n")
for r in results:
print(f" Compte : {r.account}")
print(f" De : {r.from_address}")
print(f" Sujet : {r.subject}")
print(f" URLs : {r.validation_urls or 'aucune'}")
print(f" {'-' * 56}")
+2 -2
View File
@@ -126,5 +126,5 @@ def check_mails():
if __name__ == '__main__':
# remove_invalid_email()
check_mails()
remove_invalid_email()
# check_mails()
+130 -28
View File
@@ -2,6 +2,7 @@ import datetime
import email
import logging
import sys
import time
from builtins import list
from concurrent.futures import ThreadPoolExecutor
from email.header import decode_header
@@ -11,11 +12,17 @@ from imapclient import IMAPClient
from src.db.mirgration.migration_tools import migre_accepted_appointment
from src.db.mongo_manager import MONGO_STORE_MANAGER
from src.mail.mail_constants import create_imap, show_folders
from src.mail.imap_fingerprint import get_fingerprint_for_provider, send_imap_id
from src.mail.imap_proxy_reader import ProxyMailReader, MailAccount, ProxyConfig
from src.mail.mail_constants import create_imap, show_folders, is_gmx_address, is_yahoo_address
from src.mail.mail_reader import get_gmx_proxy_config, get_yahoo_proxy_config
from src.mail.provider_strategy import (
get_strategy, group_mails_by_provider, apply_delay, ProviderStrategy,
)
from src.notification.AcceptedResultPojo import get_accepted_result_from
from src.notification.mailer import Mailer
from src.pojo.ResultEnum import ResultEnum
from src.pojo.mail.mail_pojo import MailPojo, MailAddress
from src.pojo.mail.mail_pojo import MailPojo
CONFIRMATION_SUBJECT_FR = 'Votre=20rendez-vous=20est=20confirm=C3'
CONFIRMATION_SUBJECT_EN = 'confirmed'
@@ -26,6 +33,23 @@ date_format = "%d-%b-%Y" # DD-Mon-YYYY e.g., 3-Mar-2014
FRENCH_CONFIRMED_MESSAGE = "Nous aurons le plaisir de vous accueillir"
def read_gmx_proxy_confirmation_emails(
mail,
mails_messages: list,
proxy_config: ProxyConfig,
subjects: list = None,
) -> None:
account = MailAccount(login=mail.mail, password=mail.password)
reader = ProxyMailReader(account, proxy_config, subjects=subjects)
results = reader.read(since=datetime.datetime.today())
for result in results:
mail_pojo = MailPojo(subject=result.subject, body=result.body, from_address=result.from_address)
mail_pojo.mail_address = mail.mail
mail_pojo.to_address = result.to_address or mail.mail
mail_pojo.isImapClient = True
mails_messages.append(mail_pojo)
class MailConfirmationReader():
def __init__(self, login, password):
self.login = login
@@ -34,14 +58,18 @@ class MailConfirmationReader():
def read_emails(self, mails_messages: list) -> list:
imap = create_imap(self.login)
isImapClient = isinstance(imap, IMAPClient)
# authenticate
fingerprint = get_fingerprint_for_provider(self.login)
if isImapClient:
# authenticate
dat = imap.login(self.login, str(self.password))
print("type is {} for {}".format(dat, self.login))
send_imap_id(imap, fingerprint)
else:
responseType, dat = imap.login(self.login, str(self.password))
print("type is {} for {}".format(responseType, self.login))
send_imap_id(imap, fingerprint)
mail_list = []
print("read mails from {}".format(self.login))
if isImapClient:
@@ -147,7 +175,6 @@ def accept_appointment_found(accepted_result_list: list):
_all_contact_list = MONGO_STORE_MANAGER.get_all_contact_to_book_list()
_all_register_account = MONGO_STORE_MANAGER.get_all_registered_users()
mailer = Mailer()
# sginal = SignalSender()
print(accepted_result_list)
for reserve in accepted_result_list:
result = get_accepted_result_from(reserve, MONGO_STORE_MANAGER, _all_contact_list)
@@ -156,13 +183,20 @@ def accept_appointment_found(accepted_result_list: list):
result.account_password = user.password
mailer.send_email(result, to_all=True)
MONGO_STORE_MANAGER.update_reserve_result(reserve.id, ResultEnum.ACCEPTED, reserve.message)
# sginal.send_result(result)
if len(accepted_result_list) > 0:
migre_accepted_appointment(str(datetime.date.today()))
def find_confirmation_contacts_for_today():
def find_confirmation_contacts_for_today(mode: str = 'default'):
"""
Retourne la liste des boîtes mail à scanner pour aujourd'hui.
Modes disponibles :
- 'default' : comportement habituel (exclut les adresses outlook.com)
- 'all' : toutes les adresses liées aux rendez-vous du jour (y compris outlook)
- 'gmx_only' : uniquement les adresses GMX liées aux rendez-vous du jour
"""
_all_mail_list = MONGO_STORE_MANAGER.get_destination_emails()
_all_appointments_today = MONGO_STORE_MANAGER.get_all_successful_items_for_day()
if len(_all_appointments_today) == 0:
@@ -171,31 +205,104 @@ def find_confirmation_contacts_for_today():
for _item in _all_appointments_today:
for _mail in _all_mail_list:
if _mail.mail == _item.mail:
if mode == 'all':
_mail_list_to_scan.append(_mail)
elif mode == 'gmx_only':
if is_gmx_address(_mail.mail):
_mail_list_to_scan.append(_mail)
else: # 'default'
# do not need to scan outlook
if "outlook.com" not in _mail.mail:
# if _item.url_validated is True:
_mail_list_to_scan.append(_mail)
break
print("Found {} emails to scan".format(len(_mail_list_to_scan)))
print("Found {} emails to scan (mode={})".format(len(_mail_list_to_scan), mode))
return _mail_list_to_scan
def find_confirmation_contacts_mail_list(mail_list):
# mail_list = [MailAddress("saigecong1990@pissmail.com", "cvExXKOP8oY1D@")]
mail_list.append(MailAddress("saigecong1990@pissmail.com", "cvExXKOP8oY1D@"))
mails_messages = []
# read all the emails
with ThreadPoolExecutor(max_workers=200) as executor:
for mail in mail_list:
# if DOMAIN_HOTMAIL not in mail.mail:
def read_mail_with_strategy(mail, mails_messages, strategy: ProviderStrategy, proxy_config=None, subjects=None):
"""根据策略读取邮箱,包含延迟和重试"""
apply_delay(strategy)
if proxy_config is not None:
account = MailAccount(login=mail.mail, password=mail.password)
reader = ProxyMailReader(
account, proxy_config, subjects=subjects,
max_retries=strategy.max_retries,
retry_delay=strategy.retry_backoff,
)
results = reader.read(since=datetime.datetime.today())
for result in results:
mail_pojo = MailPojo(subject=result.subject, body=result.body, from_address=result.from_address)
mail_pojo.mail_address = mail.mail
mail_pojo.to_address = result.to_address or mail.mail
mail_pojo.isImapClient = True
mails_messages.append(mail_pojo)
else:
mail_reader = MailConfirmationReader(mail.mail, mail.password)
executor.submit(mail_reader.read_emails, mails_messages)
mail_reader.read_emails(mails_messages)
def find_confirmation_contacts_mail_list(mail_list, subjects: list = None):
mails_messages = []
gmx_proxy_config = get_gmx_proxy_config()
yahoo_proxy_config = get_yahoo_proxy_config()
grouped_mails = group_mails_by_provider(mail_list)
total_mails = len(mail_list)
completed_count = 0
print(f"共需读取 {total_mails} 个邮箱,分为 {len(grouped_mails)} 个供应商组")
for provider_key, provider_mail_list in grouped_mails.items():
strategy = get_strategy(provider_mail_list[0].mail)
provider_total = len(provider_mail_list)
provider_completed = 0
print(f"[{strategy.name}] 开始处理 {provider_total} 个邮箱 (max_workers={strategy.max_workers})")
with ThreadPoolExecutor(max_workers=strategy.max_workers) as executor:
futures = {}
processed = 0
for mail in provider_mail_list:
proxy_config = None
if is_gmx_address(mail.mail) and gmx_proxy_config is not None:
proxy_config = gmx_proxy_config
elif is_yahoo_address(mail.mail) and yahoo_proxy_config is not None:
proxy_config = yahoo_proxy_config
future = executor.submit(
read_mail_with_strategy, mail, mails_messages, strategy, proxy_config, subjects
)
futures[future] = mail.mail
processed += 1
if strategy.should_wait_after_batch(processed):
time.sleep(strategy.batch_delay)
processed = 0
for future in futures:
try:
future.result(timeout=strategy.timeout)
provider_completed += 1
completed_count += 1
mail_addr = futures[future]
print(f"[{strategy.name}] 进度: {provider_completed}/{provider_total} | 总进度: {completed_count}/{total_mails} | 完成: {mail_addr}")
except Exception as e:
provider_completed += 1
completed_count += 1
mail_addr = futures[future]
print(f"[{strategy.name}] 进度: {provider_completed}/{provider_total} | 总进度: {completed_count}/{total_mails} | 错误: {mail_addr} - {e}")
print(f"[{strategy.name}] 完成处理 {provider_total} 个邮箱")
print(f"全部邮箱读取完成,共读取 {len(mails_messages)} 封邮件")
accepted_appointment_list = []
if len(mails_messages) > 0:
successful_items = MONGO_STORE_MANAGER.get_all_successful_items_for_day()
# check the hours
current_hour = datetime.datetime.now().hour
if current_hour < 15:
if current_hour < 22:
# add yesterday's appointment only for morning
successful_items.extend(MONGO_STORE_MANAGER.get_all_successful_items_for_yesterday())
for mail in mails_messages:
@@ -223,17 +330,12 @@ def find_confirmation_contacts_mail_list(mail_list):
return accepted_appointment_list
def read_mails_and_find_confirmation_contacts(all_mails=False):
def read_mails_and_find_confirmation_contacts(all_mails=False, mode: str = 'default', subjects: list = [CONFIRMATION_SUBJECT_FR, CONFIRMATION_SUBJECT_EN]):
if all_mails:
mail_list = MONGO_STORE_MANAGER.get_destination_emails()
else:
mail_list = find_confirmation_contacts_for_today()
return find_confirmation_contacts_mail_list(mail_list)
# mail_list.reverse()
# excel_reader = ExcelHelper()
# mail_list =excel_reader.read_email_pojo(file_name="/Users/lpan/Desktop/hotmail_list.xlsx")
# mail_address3 = MailAddress(mail="taibenchragu1978@onet.pl", password="2J)kyfNgyOZ")
# mail_list = [mail_address3]
mail_list = find_confirmation_contacts_for_today(mode=mode)
return find_confirmation_contacts_mail_list(mail_list, subjects=subjects)
# init_logger()
@@ -244,6 +346,6 @@ if __name__ == '__main__':
# read_mails_and_find_confirmation_contacts()
_mail_list_today = find_confirmation_contacts_for_today()
# print("size is {}".format(len(_mail_list_today)))
find_confirmation_contacts_mail_list(_mail_list_today)
find_confirmation_contacts_mail_list(_mail_list_today, subjects=[CONFIRMATION_SUBJECT_FR, CONFIRMATION_SUBJECT_EN])
# _items = MONGO_STORE_MANAGER.get_all_successful_items_for_day()
# accept_appointment_found([random.choice(_items)])
+23 -4
View File
@@ -1,5 +1,3 @@
import imaplib
from imapclient import IMAPClient
DOMAIN_YAHOO = "yahoo.com"
@@ -12,7 +10,7 @@ DOMAIN_ALICE_IT = "alice.it"
DOMAIN_MARS_DTI_NE_JP = "mars.dti.ne.jp"
DOMAN_BTVM_NE_JP = "btvm.ne.jp"
DOMAN_AURORA_DTI_NE_JP = "aurora.dti.ne.jp"
DOMAN_GMAIL = "gmail.com"
DOMAIN_GMAIL = "gmail.com"
DOMAIN_GMX = "gmx.com"
DOMAIN_GMX_NET = "gmx.net"
DOMAIN_GMX_AT = "gmx.at"
@@ -27,6 +25,27 @@ DOMAIN_NAVER = "naver.com"
DOMAIN_INBOX_LV = "inbox.lv"
DOMAIN_GMX_DE = "gmx.de"
GMX_DOMAINS = {
DOMAIN_GMX,
DOMAIN_GMX_NET,
DOMAIN_GMX_AT,
DOMAIN_GMX_FR,
DOMAIN_GMX_US,
DOMAIN_GMX_SG,
DOMAIN_GMX_CH,
DOMAIN_GMX_PT,
DOMAIN_GMX_DE,
}
def is_gmx_address(login: str) -> bool:
return any(domain in login for domain in GMX_DOMAINS)
def is_yahoo_address(login: str) -> bool:
return DOMAIN_YAHOO in login
DOMAIN_PISS_MAIL = "pissmail.com"
DOMAIN_INCEL_EMAIL = "incel.email"
DOMAIN_SHITPOSTING_EXPERT = "shitposting.expert"
@@ -106,7 +125,7 @@ def create_imap(login: str):
elif DOMAN_BTVM_NE_JP in login:
# imap = imaplib.IMAP4_SSL(BTVM_NE_JP)
imap = IMAPClient(BTVM_NE_JP, use_uid=True)
elif DOMAN_GMAIL in login:
elif DOMAIN_GMAIL in login:
# imap = imaplib.IMAP4_SSL(SEREVER_GMAIL, port=993)
imap = IMAPClient(SEREVER_GMAIL, use_uid=True)
elif DOMAIN_ONET in login:
+52 -1
View File
@@ -1,17 +1,20 @@
import datetime
import email
import os
import re
from builtins import list
from concurrent.futures import ThreadPoolExecutor
from datetime import time
from email.header import decode_header
from email.message import Message
from typing import Optional
from imapclient import IMAPClient
from src.db.mongo_manager import MONGO_STORE_MANAGER
from src.logs.AppLogging import init_logger
from src.mail.mail_constants import create_imap, show_folders
from src.mail.imap_proxy_reader import ProxyMailReader, ProxyConfig, MailAccount
from src.mail.mail_constants import create_imap, show_folders, is_gmx_address
from src.pojo.mail.mail_pojo import MailPojo
from src.utils.timeutiles import is_time_between
@@ -51,6 +54,50 @@ def find_from_mail(param):
return from_address.strip(" ").strip(">").strip("<")
def get_gmx_proxy_config() -> Optional[ProxyConfig]:
host = os.environ.get("GMX_PROXY_HOST", "")
if not host:
return None
try:
port = int(os.environ.get("GMX_PROXY_PORT", "443"))
except ValueError:
port = 443
return ProxyConfig(
host=host,
port=port,
proxy_type=os.environ.get("GMX_PROXY_TYPE", "SOCKS5"),
username=os.environ.get("GMX_PROXY_USERNAME"),
password=os.environ.get("GMX_PROXY_PASSWORD"),
)
def get_yahoo_proxy_config() -> Optional[ProxyConfig]:
host = os.environ.get("YAHOO_PROXY_HOST", "")
if not host:
return None
try:
port = int(os.environ.get("YAHOO_PROXY_PORT", "443"))
except ValueError:
port = 443
return ProxyConfig(
host=host,
port=port,
proxy_type=os.environ.get("YAHOO_PROXY_TYPE", "SOCKS5"),
username=os.environ.get("YAHOO_PROXY_USERNAME"),
password=os.environ.get("YAHOO_PROXY_PASSWORD"),
)
def read_gmx_proxy_emails(mail, mails_messages: list, proxy_config: ProxyConfig) -> None:
account = MailAccount(login=mail.mail, password=mail.password)
results = ProxyMailReader(account, proxy_config).read(since=datetime.datetime.today())
for result in results:
mail_pojo = MailPojo(subject=result.subject, body=result.body, from_address=result.from_address)
mail_pojo.mail_address = mail.mail
mail_pojo.to_address = result.to_address or mail.mail
mails_messages.append(mail_pojo)
class MailReader():
def __init__(self, login, password):
self.login = login
@@ -241,6 +288,7 @@ def read_mails():
if is_time_between(time(7, 30), time(23, 30)):
# get email address
mail_list = MONGO_STORE_MANAGER.get_destination_emails()
gmx_proxy_config = get_gmx_proxy_config()
# excel_reader = ExcelHelper()
# mail_list = excel_reader.read_email_pojo(file_name="/Users/panlei/Downloads/hotmail_list.xlsx")
# mail_address1 = MailAddress(mail="casandrakaamv@onet.pl", password="8F0o0APeAp0z")
@@ -251,6 +299,9 @@ def read_mails():
for mail in mail_list:
# check whether we need to read mail
if need_to_check_email(mail.mail, successful_items):
if is_gmx_address(mail.mail) and gmx_proxy_config is not None:
executor.submit(read_gmx_proxy_emails, mail, mails_messages, gmx_proxy_config)
else:
mail_reader = MailReader(mail.mail, mail.password)
executor.submit(mail_reader.read_emails, mails_messages)
# get ip_country info
+12 -35
View File
@@ -11,8 +11,8 @@ from imapclient import IMAPClient
from src.db.mongo_manager import MONGO_STORE_MANAGER
from src.logs.AppLogging import init_logger
from src.mail.mail_constants import DOMAIN_HOTMAIL, create_imap
from src.mail.mail_reader import need_to_valid_url
from src.mail.mail_constants import DOMAIN_HOTMAIL, create_imap, is_gmx_address
from src.mail.mail_reader import need_to_valid_url, get_gmx_proxy_config, read_gmx_proxy_emails
from src.pojo.mail.mail_pojo import MailPojo
from src.utils.excel_reader import read_contacts
from src.utils.timeutiles import is_time_between
@@ -262,6 +262,7 @@ def read_all_mails(contact_to_book_list=None):
if contact_to_book_list is None:
contact_to_book_list = MONGO_STORE_MANAGER.get_all_contact_to_book_list()
mail_list = MONGO_STORE_MANAGER.get_destination_emails()
gmx_proxy_config = get_gmx_proxy_config()
mail_list_to_check = []
for contact in contact_to_book_list:
for mail in mail_list:
@@ -273,6 +274,9 @@ def read_all_mails(contact_to_book_list=None):
for mail in mail_list_to_check:
# check whether we need to read mail
if need_to_check_email(mail.mail, successful_items):
if is_gmx_address(mail.mail) and gmx_proxy_config is not None:
executor.submit(read_gmx_proxy_emails, mail, mails_messages, gmx_proxy_config)
else:
mail_reader = MailReader(mail.mail, mail.password)
executor.submit(mail_reader.read_emails, mails_messages)
# 在读邮件时候,可能会有其他的约会提交或者约会的链接确认,所以需要刷新一下成功的列表
@@ -300,56 +304,29 @@ if __name__ == '__main__':
contact_to_book_list = read_contacts(
# file_name="/Users/lpan/Desktop/contact_list_2025-01-16_yahoo_100.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-03-15.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-03-24.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-03-15.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-03-21.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-03-20.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-03-14.xlsx")
# file_name="/Users/lpan/Desktop/contact_aol_200_2025-01-15.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-03-18.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-03-17.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-03-19.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-03-15.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-01-25.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-03-13.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-03-12.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-03-11.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-03-04.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-03-10.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-03-09.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-03-06.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-03-07.xlsx")
# file_name="/Users/lpan/Desktop/extracted_yahoo_contacts_129_24_03_win.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-04-02.xlsx")
# file_name="/Users/lpan/Desktop/extracted_aol_contacts_292_24_03_mac.xlsx")
# file_name="/Users/lpan/Desktop/real_name_contacts_197_24_03.xlsx")x
# file_name="/Users/lpan/Desktop/contact_list_2025-03-29.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-03-31.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_all_21.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_all_10.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_all_14.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_all_19.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_all_22.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_all_2_win.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-03-29.xlsx")
# file_name="/Users/lpan/Desktop/17_18_04_to_test_win.xlsx")
# file_name="/Users/lpan/Desktop/real_name_contacts_200_aol_win.xlsx")
# file_name="/Users/lpan/Desktop/real_name_contacts_200_aol_mac.xlsx")
# file_name="/Users/lpan/Desktop/real_name_contacts_500_27_03_25_fixed_mac.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-03-28_mac.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-03-27_mac.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-03-27_win.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_all_1.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_all_12.xlsx")
# file_name="/Users/panlei/Desktop/contact_list_2025-04-08.xlsx")
# file_name="/Users/panlei/Desktop/contact_list_2025-04-10.xlsx")
# file_name="/Users/panlei/Desktop/contact_list_2024-10-02.xlsx")
file_name="/Users/panlei/Desktop/contact_list_2025-08-18_no_ms.xlsx")
# file_name="/Users/panlei/Desktop/real_name_contacts_200_gmx_ch_8_04_mac.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-04-05.xlsx")
# file_name="/Users/lpan/Desktop/real_name_contacts_400_aol_mac.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-04-07.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-04-05.xlsx")
# file_name="/Users/lpan/Desktop/real_name_contacts_100_gmx_ch_05_04_mac.xlsx")
file_name="/Users/lpan/Desktop/real_name_contacts_292_win.xlsx")
# file_name="/Users/lpan/Desktop/real_name_contacts_292_win.xlsx")
# file_name="/Users/lpan/Desktop/real_name_contacts_400_mac_04_04.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-04-01.xlsx")
# file_name="/Users/lpan/Desktop/contact_list_2025-03-08_aol_400.xlsx")
read_all_mails(contact_to_book_list=contact_to_book_list)
# read_all_mails()
+407
View File
@@ -0,0 +1,407 @@
"""
provider_strategy.py
====================
不同邮箱供应商的风控策略配置。
每个供应商有不同的风控规则,需要针对性地调整:
- 并发限制
- 请求延迟
- 重试策略
- 超时时间
- 是否使用代理
"""
import random
import time
from dataclasses import dataclass
from typing import Dict, List
from src.mail.mail_constants import (
DOMAIN_163, DOMAIN_YAHOO, DOMAIN_GMAIL, DOMAIN_HOTMAIL, DOMAIN_OUTLOOK_COM,
DOMAIN_WEB_DE, DOMAIN_RAMBLER_RU, DOMAIN_NAVER, DOMAIN_ONET,
DOMAIN_GAZETA_PL, DOMAIN_INBOX_LV, DOMAIN_SINA, DOMAIN_PISS_MAIL, DOMAIN_INCEL_EMAIL,
DOMAIN_SHITPOSTING_EXPERT, DOMAIN_HATESJE_WS, DOMAIN_CHILD_PIZZA,
DOMAIN_GENOCIDE_FUN, DOMAIN_DMC_CHAT, GMX_DOMAINS,
)
@dataclass
class ProviderStrategy:
"""
单个邮箱供应商的风控策略。
Attributes
----------
name : str
供应商名称(用于日志)
max_workers : int
该供应商的最大并发线程数
min_delay : float
每次操作前的最小延迟(秒)
max_delay : float
每次操作前的最大延迟(秒)
max_retries : int
连接失败时的最大重试次数
retry_backoff : float
重试时的指数退避基数(秒)
timeout : float
连接超时时间(秒)
use_proxy : bool
是否必须使用代理
batch_size : int
批次大小(每批处理多少邮箱)
batch_delay : float
每批次之间的延迟(秒)
"""
name: str = "default"
max_workers: int = 80
min_delay: float = 1.0
max_delay: float = 3.0
max_retries: int = 3
retry_backoff: float = 2.0
timeout: float = 30.0
use_proxy: bool = False
batch_size: int = 20
batch_delay: float = 60.0
def get_delay(self) -> float:
"""返回随机延迟时间"""
return random.uniform(self.min_delay, self.max_delay)
def get_retry_delay(self, attempt: int) -> float:
"""返回重试延迟(指数退避)"""
return self.retry_backoff * (2 ** (attempt - 1)) + random.uniform(0, 1)
def should_wait_after_batch(self, processed_count: int) -> bool:
"""判断是否需要在批次后等待"""
return processed_count >= self.batch_size
PROVIDER_STRATEGIES: Dict[str, ProviderStrategy] = {
"gmail": ProviderStrategy(
name="Gmail",
max_workers=5,
min_delay=2.0,
max_delay=5.0,
max_retries=5,
retry_backoff=3.0,
timeout=45.0,
use_proxy=False,
batch_size=10,
batch_delay=120.0,
),
"yahoo": ProviderStrategy(
name="Yahoo",
max_workers=3,
min_delay=3.0,
max_delay=8.0,
max_retries=5,
retry_backoff=4.0,
timeout=60.0,
use_proxy=True,
batch_size=5,
batch_delay=180.0,
),
"gmx": ProviderStrategy(
name="GMX",
max_workers=8,
min_delay=1.5,
max_delay=4.0,
max_retries=8,
retry_backoff=2.5,
timeout=45.0,
use_proxy=True,
batch_size=15,
batch_delay=90.0,
),
"outlook": ProviderStrategy(
name="Outlook/Microsoft",
max_workers=3,
min_delay=5.0,
max_delay=10.0,
max_retries=3,
retry_backoff=5.0,
timeout=60.0,
use_proxy=False,
batch_size=5,
batch_delay=300.0,
),
"163": ProviderStrategy(
name="163",
max_workers=5,
min_delay=2.0,
max_delay=5.0,
max_retries=3,
retry_backoff=3.0,
timeout=30.0,
use_proxy=False,
batch_size=10,
batch_delay=120.0,
),
"rambler": ProviderStrategy(
name="Rambler",
max_workers=8,
min_delay=1.0,
max_delay=3.0,
max_retries=5,
retry_backoff=2.0,
timeout=30.0,
use_proxy=False,
batch_size=20,
batch_delay=60.0,
),
"naver": ProviderStrategy(
name="Naver",
max_workers=5,
min_delay=2.0,
max_delay=4.0,
max_retries=3,
retry_backoff=2.5,
timeout=30.0,
use_proxy=False,
batch_size=10,
batch_delay=90.0,
),
"onet": ProviderStrategy(
name="Onet",
max_workers=6,
min_delay=1.5,
max_delay=3.5,
max_retries=4,
retry_backoff=2.0,
timeout=35.0,
use_proxy=False,
batch_size=15,
batch_delay=75.0,
),
"web_de": ProviderStrategy(
name="Web.de",
max_workers=6,
min_delay=2.0,
max_delay=5.0,
max_retries=5,
retry_backoff=3.0,
timeout=40.0,
use_proxy=True,
batch_size=12,
batch_delay=100.0,
),
"inbox_lv": ProviderStrategy(
name="Inbox.lv",
max_workers=8,
min_delay=1.0,
max_delay=2.5,
max_retries=3,
retry_backoff=1.5,
timeout=25.0,
use_proxy=False,
batch_size=20,
batch_delay=50.0,
),
"sina": ProviderStrategy(
name="Sina",
max_workers=5,
min_delay=2.0,
max_delay=5.0,
max_retries=3,
retry_backoff=2.5,
timeout=30.0,
use_proxy=False,
batch_size=10,
batch_delay=120.0,
),
"pissmail": ProviderStrategy(
name="Pissmail (临时邮箱)",
max_workers=15,
min_delay=0.5,
max_delay=1.5,
max_retries=2,
retry_backoff=1.0,
timeout=20.0,
use_proxy=False,
batch_size=30,
batch_delay=30.0,
),
"default": ProviderStrategy(
name="默认策略",
max_workers=10,
min_delay=1.0,
max_delay=3.0,
max_retries=3,
retry_backoff=2.0,
timeout=30.0,
use_proxy=False,
batch_size=20,
batch_delay=60.0,
),
}
def get_provider_key(login: str) -> str:
"""
根据邮箱地址确定供应商策略键。
Parameters
----------
login : str
邮箱地址
Returns
-------
str
供应商策略键(如 'gmail', 'yahoo', 'gmx' 等)
"""
login_lower = login.lower()
if DOMAIN_GMAIL in login_lower:
return "gmail"
if DOMAIN_YAHOO in login_lower:
return "yahoo"
if any(domain in login_lower for domain in GMX_DOMAINS):
return "gmx"
if DOMAIN_HOTMAIL in login_lower or DOMAIN_OUTLOOK_COM in login_lower:
return "outlook"
if DOMAIN_163 in login_lower:
return "163"
if DOMAIN_RAMBLER_RU in login_lower:
return "rambler"
if DOMAIN_NAVER in login_lower:
return "naver"
if DOMAIN_ONET in login_lower:
return "onet"
if DOMAIN_GAZETA_PL in login_lower:
return "onet"
if DOMAIN_WEB_DE in login_lower:
return "web_de"
if DOMAIN_INBOX_LV in login_lower:
return "inbox_lv"
if DOMAIN_SINA in login_lower:
return "sina"
pissmail_domains = [
DOMAIN_PISS_MAIL, DOMAIN_INCEL_EMAIL, DOMAIN_SHITPOSTING_EXPERT,
DOMAIN_HATESJE_WS, DOMAIN_CHILD_PIZZA, DOMAIN_GENOCIDE_FUN, DOMAIN_DMC_CHAT,
]
if any(domain in login_lower for domain in pissmail_domains):
return "pissmail"
return "default"
def get_strategy(login: str) -> ProviderStrategy:
"""
根据邮箱地址获取对应的风控策略。
Parameters
----------
login : str
邮箱地址
Returns
-------
ProviderStrategy
对应的风控策略
"""
key = get_provider_key(login)
return PROVIDER_STRATEGIES.get(key, PROVIDER_STRATEGIES["default"])
def group_mails_by_provider(mail_list: List) -> Dict[str, List]:
"""
将邮箱列表按供应商分组。
Parameters
----------
mail_list : List
邮箱对象列表(需要有 .mail 属性)
Returns
-------
Dict[str, List]
分组后的邮箱字典 {provider_key: [mail_objects]}
"""
grouped: Dict[str, List] = {}
for mail in mail_list:
key = get_provider_key(mail.mail)
if key not in grouped:
grouped[key] = []
grouped[key].append(mail)
return grouped
def apply_delay(strategy: ProviderStrategy) -> None:
"""应用随机延迟"""
delay = strategy.get_delay()
time.sleep(delay)
def apply_batch_delay(strategy: ProviderStrategy) -> None:
"""应用批次延迟"""
time.sleep(strategy.batch_delay)
class RateLimiter:
"""
简单的速率限制器,用于跟踪和限制每个供应商的请求频率。
"""
def __init__(self):
self._request_counts: Dict[str, int] = {}
self._last_batch_time: Dict[str, float] = {}
def record_request(self, provider_key: str) -> None:
"""记录一次请求"""
self._request_counts[provider_key] = self._request_counts.get(provider_key, 0) + 1
def should_wait(self, provider_key: str, strategy: ProviderStrategy) -> bool:
"""判断是否需要等待"""
count = self._request_counts.get(provider_key, 0)
return strategy.should_wait_after_batch(count)
def wait_if_needed(self, provider_key: str, strategy: ProviderStrategy) -> None:
"""如果需要,执行等待"""
if self.should_wait(provider_key, strategy):
apply_batch_delay(strategy)
self._request_counts[provider_key] = 0
def reset(self, provider_key: str) -> None:
"""重置计数器"""
self._request_counts[provider_key] = 0
if __name__ == "__main__":
test_emails = [
"user@gmail.com",
"user@yahoo.com",
"user@gmx.de",
"user@gmx.fr",
"user@outlook.com",
"user@hotmail.com",
"user@163.com",
"user@rambler.ru",
"user@naver.com",
"user@onet.pl",
"user@web.de",
"user@inbox.lv",
"user@sina.com",
"user@pissmail.com",
"user@unknown.com",
]
print("邮箱供应商策略测试:")
print("=" * 70)
for email in test_emails:
strategy = get_strategy(email)
print(f"{email:30}{strategy.name:15} (max_workers={strategy.max_workers}, "
f"delay={strategy.min_delay}-{strategy.max_delay}s)")
+7 -7
View File
@@ -1,18 +1,18 @@
from mrz.generator.td1 import TD1CodeGenerator
first_name = "eryan"
last_name = "dai"
first_name = "kele"
last_name = "mi"
document_number = "XKJ0WSK30"
birth_day = "951211"
# sex = "F"
sex = "M"
birth_day = "991125"
sex = "F"
# sex = "M"
# optinal_data = "MFMLMANK<<<<A9" #14位
nationality = "CHN"
country_code = "FRA"
# optinal_data = "<E10805074" # 14位
optinal_data = "<967343145" #总共11位,最前面那位为空,所以加<
optinal_data = "<570873213" #总共11位,最前面那位为空,所以加<
document_prefix = "IR"
expire_date = "260815"
expire_date = "290818"
if optinal_data is not None:
code = TD1CodeGenerator(document_prefix, country_code, document_number, birth_day, sex, expire_date, nationality,
last_name, first_name, optional_data1=optinal_data)
+77 -8
View File
@@ -16,7 +16,7 @@ DEFAULT_SERIAL_TO_IGNORE = ["47e7e36b", "bitbrowser"]
def upload_contacts_list():
_contacts_to_book = read_contacts(str(Path.home()) + "/Desktop/contact_list_2025-05-20.xlsx")
_contacts_to_book = read_contacts(str(Path.home()) + "/Desktop/contact_list_2026-04-11_FIXED.xlsx")
return _contacts_to_book
@@ -119,7 +119,7 @@ def write_new_contacts_to_excel(valid_contacts: list, file_name=str(datetime.dat
def generate_valid_contact_list_for_day(segment_number=1):
_collection_name = "2026-03-26"
_collection_name = "2026-04-11"
_valid_contact_list = MONGO_STORE_MANAGER.get_all_successful_items_for_one_day(_collection_name)
_all_contacts = MONGO_STORE_MANAGER.get_all_contacts_to_book()
_contact_to_save = []
@@ -131,9 +131,11 @@ def generate_valid_contact_list_for_day(segment_number=1):
if _true_contact.mail == _contact.mail:
_contact.last_name = _true_contact.last_name
_contact.phone = _true_contact.phone
_contact.passport = str(_true_contact.resident_card_number)[:9]
_contact.passport = str(_true_contact.passport)[:9]
_contact.first_name = _true_contact.first_name
_contact.resident_card_number = str(_true_contact.resident_card_number)[:9]
_contact.resident_card_number = str(_true_contact.passport)[:9]
if _contact.mail == "angielovato14903@yahoo.com":
print("no resident card number for " + _contact.mail)
print("{}:{}".format(_true_contact.mail, _true_contact.source_from))
if isinstance(_true_contact.source_from, str) and _true_contact.source_from is not None and len(_true_contact.source_from) > 0:
print(_true_contact.source_from)
@@ -285,14 +287,80 @@ def write_resident_card_number_to_contact_list(file_to_read, file_name="contact_
write_list_with_segment_number(file_name, _contacts_to_book, 1)
def check_resident_card_number(file_path):
"""读取 contact_list Excel 文件,检查 resident_card_number 是否为 9 位纯数字字符串。
若不是则输出该联系人信息,并调用 generate_single_titre_sejour_number() 生成新值进行修复。
最终将所有联系人(含修复结果)写入原文件名+_FIXED 的新文件,保持原有列格式。"""
_contact_list = read_contacts(file_path)
_has_invalid = False
for _contact in _contact_list:
rcn = str(_contact.resident_card_number) if _contact.resident_card_number is not None else ""
if not (len(rcn) == 9 and rcn.isdigit()):
print(_contact)
_contact.resident_card_number = generate_single_titre_sejour_number()
_has_invalid = True
if not _has_invalid:
print("[OK] Tous les resident_card_number sont valides (9 chiffres). Aucun fichier créé.")
return
# Construire le chemin du fichier de sortie : même dossier, nom + _FIXED + extension
p = Path(file_path)
output_file = str(p.parent / (p.stem + "_FIXED" + p.suffix))
# Écriture dans le même format que write_new_contacts_to_excel
row = 0
col = 0
workbook = xlsxwriter.Workbook(output_file, {'nan_inf_to_errors': True})
header_data = ['name', 'phone', 'passport', 'email', 'store', 'serial', 'ip_country', 'ua',
'resident_card_number', 'source_from']
worksheet = workbook.add_worksheet()
header_format = workbook.add_format({'bold': True})
for col_num, data in enumerate(header_data):
worksheet.write(row, col_num, data, header_format)
row = 1
def safe_write_val(row_num, col_num, value):
try:
if isinstance(value, str):
if value.lower() in ['nan', 'inf', '-inf']:
worksheet.write(row_num, col_num, "")
else:
worksheet.write(row_num, col_num, value)
elif isinstance(value, (int, float)):
if math.isnan(value) or math.isinf(value):
worksheet.write(row_num, col_num, "")
else:
worksheet.write(row_num, col_num, value)
else:
worksheet.write(row_num, col_num, value if value is not None else "")
except (TypeError, ValueError):
worksheet.write(row_num, col_num, "")
for info in _contact_list:
worksheet.write(row, col, "{} {}".format(info.last_name, info.first_name))
worksheet.write(row, col + 1, info.phone)
worksheet.write(row, col + 2, info.passport)
worksheet.write(row, col + 3, info.mail)
worksheet.write(row, col + 4, info.store)
worksheet.write(row, col + 5, info.serial)
worksheet.write(row, col + 6, info.ip_country)
safe_write_val(row, col + 7, info.ua)
worksheet.write(row, col + 8, info.resident_card_number)
worksheet.write(row, col + 9, info.source_from)
row += 1
workbook.close()
print("Fichier corrigé écrit dans : " + output_file)
if __name__ == '__main__':
# write_resident_card_number_to_contact_list(file_to_read=str(Path.home()) + "/Desktop/contact_list_all_13.xlsx",
# file_name="contact_list_all_13")
# contacts_to_book = upload_contacts_list()
# MONGO_STORE_MANAGER.upload_contact_list(contacts_to_book)
contacts_to_book = upload_contacts_list()
MONGO_STORE_MANAGER.upload_contact_list(contacts_to_book)
# print("start at {}".format(datetime.datetime.now()))
generate_valid_contact_list_for_day(segment_number=2)
# generate_contact_from_mail_list("/Users/lpan/Downloads/邮箱及密码_23_03_25_yahoo.xlsx")
# generate_valid_contact_list_for_day(segment_number=2)
# generate_contact_from_mail_list("/Users/panlei/Downloads/100_yahoo_11_04.xlsx")
# print("end at {}".format(datetime.datetime.now()))
# update_contact_list_not_received_mail()
# get_old_validated_contact_list()
@@ -301,4 +369,5 @@ if __name__ == '__main__':
# merge_contact_list_files(
# "/Users/lpan/Desktop/contact_list_2024-11-06.xlsx"
# ])
# check_resident_card_number(str(Path.home()) + "/Desktop/contact_list_2026-04-11_FIXED.xlsx")
# fix_phone_number_format(str(Path.home()) + "/Desktop/gmx_ch_100_2024-06-13.xlsx")
@@ -32,7 +32,7 @@ def get_random_number(size=7) -> str:
return ran
def generate_single_titre_sejour_number() -> str:
id_number = prefix_tire_de_sejour + get_random_number()
id_number = prefix_tire_de_sejour + get_random_number(size=8)
return id_number
def generate_titre_sejour_number(size=10) -> list:
+10
View File
@@ -0,0 +1,10 @@
import math
def sanitize_excel_value(value):
if value is None:
return ""
if isinstance(value, float) and (math.isnan(value) or math.isinf(value)):
return ""
return value
+2
View File
@@ -0,0 +1,2 @@
export PORT=4000
litellm --config config.yaml