import datetime import email import re import os from typing import Optional from builtins import list from concurrent.futures import ThreadPoolExecutor from datetime import time from email.header import decode_header from email.message import Message from imapclient import IMAPClient from src.db.mongo_manager import MONGO_STORE_MANAGER from src.logs.AppLogging import init_logger from src.mail.mail_constants import create_imap, show_folders, is_gmx_address from src.mail.imap_proxy_reader import ProxyMailReader, ProxyConfig, MailAccount from src.pojo.mail.mail_pojo import MailPojo from src.utils.timeutiles import is_time_between VALIDATION_URL_SUBJECT_fr = 'Votre demande de rendez-vous' VALIDATION_URL_SUBJECT_EN = 'Please confirm your appointment request' VALIDATION_URL_REGEX = """https:\/\/rendezvousparis.hermes.com\/client\/register\/[A-Z0-9]+\/validate.code=[A-Z0-9]+""" PART_VALIDATION_URL_REGEX = """client\/register\/[A-Z0-9]+\/validate.code=[A-Z0-9]+""" HERMES_EMAIL = "no-reply@hermes.com" EMAIL_ADDRESS_REGEX = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b' date_format = "%d-%b-%Y" # DD-Mon-YYYY e.g., 3-Mar-2014 REDIRECTION_MAILS = "chenpeijun@aol.com,hongjiang176@aol.com,ciyuexie@aol.com,ciccidaniel@aol.com,armasgoodman@aol.com,wknd.gemerine@aol.com,rafmail1981@aol.com,tonovichivanenaki@aol.com,hetland.ari@aol.com,mateusiversen@aol.com,anasida76@aol.com,sen70zib@aol.com,mezeiderrick@aol.com,stanisl49avchic@aol.com,damcvrobaneuron@aol.com,suyzanna_fleona@aol.com,dxealing.dissa@aol.com,hogg.karen@aol.com,obocharovamarina@aol.com,buchholzjohann@aol.com,orn.cecchini@aol.com,percivaltorgersen@aol.com,candalgudrun@aol.com,filimonis.76@aol.com,bengann_100@aol.com,axelhanne@aol.com,tiffanylarochelle@aol.com,nicoleta.r@aol.com,eichenbaum.1963@aol.com,samognat32@aol.com,edem_headshot@aol.com,kozmakuzmich1960@aol.com,anders.riva@aol.com,yasiaforever.1971@aol.com,mo90nroe@aol.com,dimidrol.1969@aol.com,duwei1998@gmx.com" def check_email_address(email): # pass the regular expression # and the string into the fullmatch() method if (re.fullmatch(EMAIL_ADDRESS_REGEX, email)): print("Valid Email") return True else: print("Invalid Email") return False def find_from_mail(param): from_address, encoded_algo = param[0] if isinstance(from_address, bytes): from_address = from_address.decode(encoded_algo) if not check_email_address(from_address) and len(param) == 2: from_address, new_encode = param[1] if new_encode is None: new_encode = encoded_algo if isinstance(from_address, bytes): from_address = from_address.decode(new_encode) return from_address.strip(" ").strip(">").strip("<") return from_address.strip(" ").strip(">").strip("<") def get_gmx_proxy_config() -> Optional[ProxyConfig]: host = os.environ.get("GMX_PROXY_HOST", "") if not host: return None try: port = int(os.environ.get("GMX_PROXY_PORT", "443")) except ValueError: port = 443 return ProxyConfig( host=host, port=port, proxy_type=os.environ.get("GMX_PROXY_TYPE", "SOCKS5"), username=os.environ.get("GMX_PROXY_USERNAME"), password=os.environ.get("GMX_PROXY_PASSWORD"), ) def read_gmx_proxy_emails(mail, mails_messages: list, proxy_config: ProxyConfig) -> None: account = MailAccount(login=mail.mail, password=mail.password) results = ProxyMailReader(account, proxy_config).read(since=datetime.datetime.today()) for result in results: mail_pojo = MailPojo(subject=result.subject, body=result.body, from_address=result.from_address) mail_pojo.mail_address = mail.mail mail_pojo.to_address = result.to_address or mail.mail mails_messages.append(mail_pojo) class MailReader(): def __init__(self, login, password): self.login = login self.password = password def read_emails(self, mails_messages: list) -> list: imap = create_imap(self.login) isImapClient = isinstance(imap, IMAPClient) print("isImapClient is " + str(isImapClient)) if isImapClient: # authenticate dat = imap.login(self.login, str(self.password)) print("type is {} for {}".format(dat, self.login)) else: responseType, dat = imap.login(self.login, str(self.password)) print("type is {} for {}".format(responseType, self.login)) mail_list = [] print("read mails from {}".format(self.login)) if not isImapClient: folder_list = show_folders(imap) for folder in folder_list: print("folder is {}".format(folder)) mail_list.extend(self._get_messages_from_folder(imap, subject=VALIDATION_URL_SUBJECT_fr, folder=folder)) mail_list.extend(self._get_messages_from_folder(imap, subject=VALIDATION_URL_SUBJECT_EN, folder=folder)) else: folder_list = show_folders(imap) for folder in folder_list: print("folder is " + folder) mail_list.extend(self._get_messages_from_folder_for_imapclient(imap, folder=folder)) if not isImapClient: imap.close() imap.logout() mails_messages.extend(mail_list) return mail_list def _get_messages_from_folder(self, imap, subject, folder="INBOX") -> list: imap.select(folder) mail_messages = [] typ, data = imap.search(None, '(SUBJECT "{}" SINCE "{}")'.format(subject, datetime.datetime.today().strftime( date_format))) for i in data[0].split(): # fetch the email message by ID res, msg = imap.fetch(i.decode("utf-8"), "(RFC822)") body = '' for response in msg: if isinstance(response, tuple): # parse a bytes email into a message object msg = email.message_from_bytes(response[1]) # decode the email subject subject, subject_encoded = decode_header(msg["Subject"])[0] received_date = msg["Date"] if isinstance(subject, bytes): # if it's a bytes, decode to str subject = subject.decode(subject_encoded) # decode email sender from_address = find_from_mail(decode_header(msg.get("From"))) to_email = find_from_mail(decode_header(msg.get("To"))) print("Email:", self.login) print("From:", from_address) print("To:", to_email) print("Subject:", subject) # if the email message is multipart if msg.is_multipart(): # iterate over email parts for part in msg.walk(): try: # get the email body payloads = part.get_payload() if isinstance(payloads, list): for payload in payloads: if isinstance(payload, Message): body = body + payload.get_payload(decode=True).decode("iso-8859-1") # print(body) except Exception as Error: print(Error) else: body = msg.get_payload(decode=True).decode() print(body) if VALIDATION_URL_SUBJECT_fr in subject or VALIDATION_URL_SUBJECT_EN in subject: mail = MailPojo(subject=subject, body=body, from_address=from_address) if to_email is None: mail.to_address = self.login else: mail.to_address = to_email mail.mail_address = self.login mail_messages.append(mail) return mail_messages def _get_messages_from_folder_for_imapclient(self, imap, folder="INBOX") -> list: mail_messages = [] search_terms = 'SINCE "{}"'.format( datetime.datetime.today().strftime( date_format)) print("search terms is " + search_terms) imap.select_folder(folder) messages = imap.search(['SINCE', datetime.datetime.today()]) print("%d messages from our best friend" % len(messages)) for uid, message_data in imap.fetch(messages, 'RFC822').items(): try: email_message = email.message_from_bytes(message_data[b'RFC822']) from_address = email_message.get('FROM') subject = email_message.get('subject') # print("{}, {},{}".format(from_address, subject, email_message)) body = "" if "no-reply@hermes.com" in from_address or "appointment2022@aol.com": for part in email_message.walk(): print(part.get_content_type()) if part.get_content_type() == "text/html": body = body + part.get_payload(decode=True).decode("utf-8") elif part.get_content_type() == "text/plain": body = body + part.get_payload() if VALIDATION_URL_SUBJECT_fr in subject or VALIDATION_URL_SUBJECT_EN in subject or "Votre=20demande=20de=20rendez-vous" in subject: mail = MailPojo(subject=subject, body=body, from_address=from_address) mail.isImapClient = True print("email is {}".format(self.login)) print("body is {}".format(body)) print("subject is {}".format(subject)) if len(mail.to_address) == 0: mail.to_address = self.login mail_messages.append(mail) except Exception as error: print(error) print("error trying to read email_Message for {}".format(self.login)) return mail_messages def need_to_valid_url(url: str, email, successful_items): print("url is :" + url) parts = url.split('/') id = parts[5] if len(id) == 6: for item in successful_items: if item.id == id: if item.url_validated is not None: return item, not item.url_validated else: # if url_validated is None if item.url_validated is not None: return item, not item.url_validated return item, True for item in successful_items: if item.mail == email: return item, True return None, True else: print("id not valid:{}".format(id)) return None, False def need_to_check_email(mail: str, successful_items) -> bool: print("successful_items size is " + str(len(successful_items))) if mail in REDIRECTION_MAILS: return True # return True # get all the item with the current mail filtered_items = list(filter(lambda item: item.mail == mail, successful_items)) # has validated value if len(filtered_items) > 0: validated_items = list(filter( lambda filtered_item: filtered_item.url_validated is not None and filtered_item.url_validated is True, filtered_items)) if len(validated_items) > 0: return False else: return True else: return False # for item in filtered_items: # if mail in item.email: # print("url_validated for {} is {}".format(mail, item.url_validated)) # if item.url_validated is not None: # return not item.url_validated # else: # # if url-validated is none, need to check email # return True # # if the email has not been booked, we needn't read mails. # # return True # return False def read_mails(): # check time before start checking emails if is_time_between(time(7, 30), time(23, 30)): # get email address mail_list = MONGO_STORE_MANAGER.get_destination_emails() gmx_proxy_config = get_gmx_proxy_config() # excel_reader = ExcelHelper() # mail_list = excel_reader.read_email_pojo(file_name="/Users/panlei/Downloads/hotmail_list.xlsx") # mail_address1 = MailAddress(mail="casandrakaamv@onet.pl", password="8F0o0APeAp0z") # mail_list = [mail_address1] successful_items = MONGO_STORE_MANAGER.get_all_successful_items_for_day() mails_messages = [] with ThreadPoolExecutor(max_workers=200) as executor: for mail in mail_list: # check whether we need to read mail if need_to_check_email(mail.mail, successful_items): if is_gmx_address(mail.mail) and gmx_proxy_config is not None: executor.submit(read_gmx_proxy_emails, mail, mails_messages, gmx_proxy_config) else: mail_reader = MailReader(mail.mail, mail.password) executor.submit(mail_reader.read_emails, mails_messages) # get ip_country info _refreshed_successful_items = MONGO_STORE_MANAGER.get_all_successful_items_for_day() _all_contact_list = MONGO_STORE_MANAGER.get_all_contacts_to_book() contact_serial_map_list = MONGO_STORE_MANAGER.get_all_contact_serial_list() with ThreadPoolExecutor(max_workers=10) as executor: for mail in mails_messages: match = re.search(VALIDATION_URL_REGEX, mail.body) if match: url = match.group(0) _item, is_need_to = need_to_valid_url(url, mail.to_address, _refreshed_successful_items) if is_need_to: MONGO_STORE_MANAGER.save_links_to_validate(url, mail.to_address, _all_contact_list, _item, contact_serial_map_list) # url_validator = LinkValidator(url) print("need to validate url: " + url) # executor.submit(url_validator.start_page, params.get_proxy(ProxyType.OXYLABS), False) else: print("do not need to click url --> {}".format(mail.mail_address)) # check whether the url has already been clicked if __name__ == '__main__': init_logger() read_mails()