import datetime import email import imaplib import re from concurrent.futures import ThreadPoolExecutor from email.header import decode_header from email.message import Message from builtins import list from src import params from src.db.mongo_manager import MONGO_STORE_MANAGER from src.logs.AppLogging import init_logger from src.mail.mail_constants import DOMAIN_163, DOMAIN_YAHOO, DOMAIN_SINA, IMAP_SERVER_163, YAHOO_IMAP_SERVER, \ IMAP_SERVER_SINA, AOL_IMAP_SERVER from src.pojo.mail.mail_pojo import MailPojo, MailAddress from src.proxy.proxy_type import ProxyType from src.utils.timeutiles import is_time_between from src.workers.link_validator import LinkValidator from datetime import time VALIDATION_URL_SUBJECT_fr = 'Validation de votre demande de rendez-vous' VALIDATION_URL_SUBJECT_EN = 'Please confirm your appointment' VALIDATION_URL_REGEX = """https:\/\/rendezvousparis.hermes.com\/client\/register\/[A-Z0-9]+\/validate.code=[A-Z0-9]+""" HERMES_EMAIL = "no-reply@hermes.com" date_format = "%d-%b-%Y" # DD-Mon-YYYY e.g., 3-Mar-2014 REDIRECTION_MAILS = "appointment2022@aol.com, chenpeijun@aol.com,hongjiang176@aol.com,ciyuexie@aol.com" class MailReader(): def __init__(self, login, password): self.login = login self.password = password @staticmethod def show_folders(imap): for i in imap.list()[1]: l = i.decode().split(' "/" ') print(l[0] + " = " + l[1]) def read_emails(self, mails_messages: list) -> list: # create an IMAP4 class with SSL if DOMAIN_163 in self.login: imap = imaplib.IMAP4_SSL(IMAP_SERVER_163) elif DOMAIN_YAHOO in self.login: imap = imaplib.IMAP4_SSL(YAHOO_IMAP_SERVER) elif DOMAIN_SINA in self.login: imap = imaplib.IMAP4_SSL(IMAP_SERVER_SINA) else: imap = imaplib.IMAP4_SSL(AOL_IMAP_SERVER) # authenticate imap.login(self.login, self.password) mail_list = [] print("read mails from {}".format(self.login)) # self.show_folders(imap) # total number of emails # get mails from inbox # (\Archive \HasNoChildren) = "Archive" # (\Junk \HasNoChildren) = "Bulk" # (\Drafts \HasNoChildren) = "Draft" # (\HasNoChildren) = "Inbox" # (\Sent \HasNoChildren) = "Sent" # (\Trash \HasNoChildren) = "Trash" mail_list.extend(self._get_messages_from_folder(imap, subject=VALIDATION_URL_SUBJECT_fr)) mail_list.extend(self._get_messages_from_folder(imap, subject=VALIDATION_URL_SUBJECT_EN)) # mail_list.extend(self._get_messages_from_folder(imap, folder="Bulk")) # close the connection and logout imap.close() imap.logout() mails_messages.extend(mail_list) return mail_list def _get_messages_from_folder(self, imap, subject, folder="INBOX") -> list: imap.select(folder) mail_messages = [] typ, data = imap.search(None, '(SUBJECT "{}" SINCE "{}")'.format(subject, datetime.datetime.today().strftime( date_format))) for i in data[0].split(): # fetch the email message by ID res, msg = imap.fetch(i.decode("utf-8"), "(RFC822)") body = '' for response in msg: if isinstance(response, tuple): # parse a bytes email into a message object msg = email.message_from_bytes(response[1]) # decode the email subject subject, subject_encoded = decode_header(msg["Subject"])[0] received_date = msg["Date"] if isinstance(subject, bytes): # if it's a bytes, decode to str subject = subject.decode(subject_encoded) # decode email sender from_address, subject_encoded = decode_header(msg.get("From"))[0] if isinstance(from_address, bytes): from_address = from_address.decode(subject_encoded) print("From:", from_address) print("Subject:", subject) # if the email message is multipart if msg.is_multipart(): # iterate over email parts for part in msg.walk(): try: # get the email body payloads = part.get_payload() if isinstance(payloads, list): for payload in payloads: if isinstance(payload, Message): body = body + payload.get_payload(decode=True).decode("iso-8859-1") # print(body) except Exception as Error: print(Error) else: body = msg.get_payload(decode=True).decode() print(body) if VALIDATION_URL_SUBJECT_fr in subject or VALIDATION_URL_SUBJECT_EN in subject: mail = MailPojo(subject=subject, body=body, from_address=from_address) mail_messages.append(mail) return mail_messages def clean(text): # clean text for creating a folder return "".join(c if c.isalnum() else "_" for c in text) def need_to_valid_url(url: str, successful_items) -> bool: if len(successful_items) == 0: return False print("url is :" + url) parts = url.split('/') id = parts[5] if len(id) == 6: for item in successful_items: # if item.url_validated is not None: # print("id:{}, status:{} ".format(id, str(item.url_validated))) if item.id == id: if item.url_validated is not None: return not item.url_validated else: # if url_validated is None return True # return True by default return False else: print("id not valid:{}".format(id)) return False def need_to_check_email(mail: str, successful_items) -> bool: print("successful_items size is " + str(len(successful_items))) if mail in REDIRECTION_MAILS: return True # return True for item in successful_items: if mail in item.email: if item.url_validated is not None: print("url_validated for {} is {}".format(item.url_validated, mail)) return not item.url_validated else: # if url-validated is none, need to check email return True # if the email has not been booked, we needn't read mails. # return True return False def read_mails(): # check time before start checking emails if is_time_between(time(9, 30), time(19, 30)): # get email address mail_list = MONGO_STORE_MANAGER.get_destination_emails() # mail_address1 = MailAddress(mail="appointment2022@aol.com", password="gyilpmvyyvlcaviq") # # mail_address2 = MailAddress(mail="chenpeijun@aol.com", password="ytifuwguknzifqyb") # # mail_address2 = MailAddress(mail="sdfgfhgf1986@aol.com", password="fjwcgvhxxlywqfwm") # # mail_address3 = MailAddress(mail="ciyuexie@aol.com", password="czezlmmyypokdfce") # # mail_address4 = MailAddress(mail="hongjiang176@aol.com", password="ftzpscgzvwneelmn") # mail_address4 = MailAddress(mail="ryan_meacham2856@yahoo.com", password="ulgggkodxqbvrpgm") # mail_list = [mail_address3, mail_address2, mail_address1, mail_address4] # mail_list = [mail_address1] successful_items = MONGO_STORE_MANAGER.get_all_successful_items_for_day() mails_messages = [] with ThreadPoolExecutor(max_workers=20) as executor: for mail in mail_list: # check whether we need to read mail if need_to_check_email(mail.mail, successful_items): mail_reader = MailReader(mail.mail, mail.password) executor.submit(mail_reader.read_emails, mails_messages) with ThreadPoolExecutor(max_workers=10) as executor: for mail in mails_messages: match = re.search(VALIDATION_URL_REGEX, mail.body) if match: url = match.group(0) if need_to_valid_url(url, successful_items): MONGO_STORE_MANAGER.save_links_to_validate(url) url_validator = LinkValidator(url) # print("need to validate url: " + url) executor.submit(url_validator.start_page, params.get_proxy(ProxyType.OXYLABS), True) else: print("do not need to click url --> {}".format(mail.mail_address)) # check whether the url has already been clicked if __name__ == '__main__': init_logger() read_mails()