From 9ff18e3cca56fd6844e87342728d76884c541d73 Mon Sep 17 00:00:00 2001 From: Lei PAN Date: Wed, 11 Mar 2026 23:05:44 +0100 Subject: [PATCH] optimization on check_email_existence.py --- src/utils/contacts/check_email_existence.py | 141 +++++++++++++++++--- 1 file changed, 122 insertions(+), 19 deletions(-) diff --git a/src/utils/contacts/check_email_existence.py b/src/utils/contacts/check_email_existence.py index 0d4a0ef..96354e0 100755 --- a/src/utils/contacts/check_email_existence.py +++ b/src/utils/contacts/check_email_existence.py @@ -1,32 +1,135 @@ import os +import sys +import logging from pathlib import Path +from typing import Iterable, List, Optional, Set, Any + +# Ensure project root is on sys.path so imports like 'src.*' work when running this +# file directly (python src/utils/contacts/check_email_existence.py) +PROJECT_ROOT = Path(__file__).resolve().parents[3] +if str(PROJECT_ROOT) not in sys.path: + sys.path.insert(0, str(PROJECT_ROOT)) from src.db.mongo_manager import MONGO_STORE_MANAGER from src.utils.excel_reader import read_contacts -# 检查联系人表的有邮件有没有在数据库中 -def check_email_existence(file_path, mail_list): - contacts = read_contacts(file_path) - print("mail_list size is " + str(len(mail_list))) - mail_raw_list = [] - for mail_address in mail_list: - mail_raw_list.append(mail_address.mail) +logger = logging.getLogger(__name__) + +def _extract_email_from_contact(contact: Any) -> Optional[str]: + """Return the email string from a contact object or dict, or None if missing.""" + if contact is None: + return None + # If it's an object with attribute 'mail' + if hasattr(contact, 'mail'): + return getattr(contact, 'mail') + # If it's a dict-like + if isinstance(contact, dict): + return contact.get('mail') or contact.get('email') + # Fallback: try common keys + try: + return contact['mail'] # type: ignore + except Exception: + return None + + +def _normalize_email(email: Optional[str]) -> Optional[str]: + if not email: + return None + return email.strip().lower() + + +def check_email_existence( + file_path: str | Path, + mail_list: Iterable, + exclude_domains: Optional[Set[str]] = None, + verbose: bool = True, +) -> List[str]: + """ + Check which emails in the contacts file are NOT present in mail_list. + + Args: + file_path: path to the contacts file (passed to read_contacts). + mail_list: iterable of objects that have a 'mail' attribute (or dict with 'mail'). + exclude_domains: optional set of domains (lowercase, without leading '@') to ignore from output (e.g. {'gmail.com'}). + verbose: if True, log/print found missing emails. + + Returns: + A list of missing emails (unique, normalized lower-case). + """ + # read_contacts expects a string path; ensure we pass a str to avoid TypeError + contacts = read_contacts(str(file_path)) + + # Build a set of normalized emails from mail_list for O(1) lookup + mail_set: Set[str] = set() + for m in mail_list: + addr = None + if hasattr(m, 'mail'): + addr = getattr(m, 'mail') + elif isinstance(m, dict): + addr = m.get('mail') or m.get('email') + else: + # try generic attribute access + addr = getattr(m, 'email', None) + norm = _normalize_email(addr) + if norm: + mail_set.add(norm) + + if verbose: + logger.info('mail_list size is %d', len(mail_set)) + + exclude_domains = {d.lower() for d in (exclude_domains or set())} + + missing_set: Set[str] = set() for contact in contacts: - if contact.mail not in mail_raw_list: - if "gmail" not in contact.mail: - print(contact.mail) + raw = _extract_email_from_contact(contact) + norm = _normalize_email(raw) + if not norm: + continue + # Domain exclusion check + try: + domain = norm.split('@', 1)[1] + except IndexError: + domain = '' + if domain and domain in exclude_domains: + continue + if norm not in mail_set: + missing_set.add(norm) + + missing = sorted(missing_set) + if verbose and missing: + for email in missing: + print(email) + # logger.warning(email) + return missing if __name__ == '__main__': + import argparse + + logging.basicConfig(level=logging.INFO) + + parser = argparse.ArgumentParser(description='Check contact emails against DB email list') + parser.add_argument( + 'path', nargs='?', default=str(Path.home() / 'Desktop' / 'to_check'), help='file or folder to check' + ) + parser.add_argument('--exclude', '-e', nargs='*', help='domains to exclude, e.g. gmail.com', default=['gmail.com']) + args = parser.parse_args() + mail_list = MONGO_STORE_MANAGER.get_destination_emails() - _home = str(Path.home()) - _folder_path = _home + "/Desktop/to_check" - # get files of a folder - for file in os.listdir(_folder_path): - if file.endswith(".xlsx"): - _file_path = os.path.join(_folder_path, file) - # print(_file_path) - print("Will check file " + _file_path) - check_email_existence(_file_path, mail_list) + + path = Path(args.path) + if path.is_dir(): + for file in os.listdir(path): + if file.endswith('.xlsx'): + file_path = path / file + logger.info('Will check file %s', file_path) + missing = check_email_existence(file_path, mail_list, exclude_domains=set(args.exclude)) + if missing: + logger.info('Missing emails found: %d', len(missing)) + else: + logger.info('Will check file %s', path) + missing = check_email_existence(path, mail_list, exclude_domains=set(args.exclude)) + if missing: + logger.info('Missing emails found: %d', len(missing))