optimization on check_email_existence.py

This commit is contained in:
Lei PAN
2026-03-11 23:05:44 +01:00
parent f0b01087ff
commit 9ff18e3cca
+122 -19
View File
@@ -1,32 +1,135 @@
import os
import sys
import logging
from pathlib import Path
from typing import Iterable, List, Optional, Set, Any
# Ensure project root is on sys.path so imports like 'src.*' work when running this
# file directly (python src/utils/contacts/check_email_existence.py)
PROJECT_ROOT = Path(__file__).resolve().parents[3]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from src.db.mongo_manager import MONGO_STORE_MANAGER
from src.utils.excel_reader import read_contacts
# 检查联系人表的有邮件有没有在数据库中
def check_email_existence(file_path, mail_list):
contacts = read_contacts(file_path)
print("mail_list size is " + str(len(mail_list)))
mail_raw_list = []
for mail_address in mail_list:
mail_raw_list.append(mail_address.mail)
logger = logging.getLogger(__name__)
def _extract_email_from_contact(contact: Any) -> Optional[str]:
"""Return the email string from a contact object or dict, or None if missing."""
if contact is None:
return None
# If it's an object with attribute 'mail'
if hasattr(contact, 'mail'):
return getattr(contact, 'mail')
# If it's a dict-like
if isinstance(contact, dict):
return contact.get('mail') or contact.get('email')
# Fallback: try common keys
try:
return contact['mail'] # type: ignore
except Exception:
return None
def _normalize_email(email: Optional[str]) -> Optional[str]:
if not email:
return None
return email.strip().lower()
def check_email_existence(
file_path: str | Path,
mail_list: Iterable,
exclude_domains: Optional[Set[str]] = None,
verbose: bool = True,
) -> List[str]:
"""
Check which emails in the contacts file are NOT present in mail_list.
Args:
file_path: path to the contacts file (passed to read_contacts).
mail_list: iterable of objects that have a 'mail' attribute (or dict with 'mail').
exclude_domains: optional set of domains (lowercase, without leading '@') to ignore from output (e.g. {'gmail.com'}).
verbose: if True, log/print found missing emails.
Returns:
A list of missing emails (unique, normalized lower-case).
"""
# read_contacts expects a string path; ensure we pass a str to avoid TypeError
contacts = read_contacts(str(file_path))
# Build a set of normalized emails from mail_list for O(1) lookup
mail_set: Set[str] = set()
for m in mail_list:
addr = None
if hasattr(m, 'mail'):
addr = getattr(m, 'mail')
elif isinstance(m, dict):
addr = m.get('mail') or m.get('email')
else:
# try generic attribute access
addr = getattr(m, 'email', None)
norm = _normalize_email(addr)
if norm:
mail_set.add(norm)
if verbose:
logger.info('mail_list size is %d', len(mail_set))
exclude_domains = {d.lower() for d in (exclude_domains or set())}
missing_set: Set[str] = set()
for contact in contacts:
if contact.mail not in mail_raw_list:
if "gmail" not in contact.mail:
print(contact.mail)
raw = _extract_email_from_contact(contact)
norm = _normalize_email(raw)
if not norm:
continue
# Domain exclusion check
try:
domain = norm.split('@', 1)[1]
except IndexError:
domain = ''
if domain and domain in exclude_domains:
continue
if norm not in mail_set:
missing_set.add(norm)
missing = sorted(missing_set)
if verbose and missing:
for email in missing:
print(email)
# logger.warning(email)
return missing
if __name__ == '__main__':
import argparse
logging.basicConfig(level=logging.INFO)
parser = argparse.ArgumentParser(description='Check contact emails against DB email list')
parser.add_argument(
'path', nargs='?', default=str(Path.home() / 'Desktop' / 'to_check'), help='file or folder to check'
)
parser.add_argument('--exclude', '-e', nargs='*', help='domains to exclude, e.g. gmail.com', default=['gmail.com'])
args = parser.parse_args()
mail_list = MONGO_STORE_MANAGER.get_destination_emails()
_home = str(Path.home())
_folder_path = _home + "/Desktop/to_check"
# get files of a folder
for file in os.listdir(_folder_path):
if file.endswith(".xlsx"):
_file_path = os.path.join(_folder_path, file)
# print(_file_path)
print("Will check file " + _file_path)
check_email_existence(_file_path, mail_list)
path = Path(args.path)
if path.is_dir():
for file in os.listdir(path):
if file.endswith('.xlsx'):
file_path = path / file
logger.info('Will check file %s', file_path)
missing = check_email_existence(file_path, mail_list, exclude_domains=set(args.exclude))
if missing:
logger.info('Missing emails found: %d', len(missing))
else:
logger.info('Will check file %s', path)
missing = check_email_existence(path, mail_list, exclude_domains=set(args.exclude))
if missing:
logger.info('Missing emails found: %d', len(missing))