support onet.pl

This commit is contained in:
2023-04-06 19:44:16 +02:00
parent 640f4be789
commit bcfe4eb283
2 changed files with 89 additions and 72 deletions
+6 -4
View File
@@ -1,5 +1,7 @@
import imaplib
from imapclient import IMAPClient
DOMAIN_YAHOO = "yahoo.com"
DOMAIN_SINA = "sina.com"
DOMAIN_HOTMAIL = "hotmail.com"
@@ -30,11 +32,11 @@ SERVER_IMAGE_ONET = "imap.poczta.onet.pl"
def create_imap(login: str):
# create an IMAP4 class with SSL
if DOMAIN_163 in login:
imap = imaplib.IMAP4_SSL(IMAP_SERVER_163)
imap = IMAPClient(IMAP_SERVER_163, use_uid=True)
elif DOMAIN_YAHOO in login:
imap = imaplib.IMAP4_SSL(YAHOO_IMAP_SERVER)
imap = IMAPClient(YAHOO_IMAP_SERVER, use_uid=True)
elif DOMAIN_SINA in login:
imap = imaplib.IMAP4_SSL(IMAP_SERVER_SINA)
imap = IMAPClient(IMAP_SERVER_SINA, use_uid=True)
elif DOMAIN_HOTMAIL in login:
imap = imaplib.IMAP4_SSL(HOTMAIL_IMAP_SERVER)
elif DOMAIN_RAMBLER_RU in login:
@@ -44,7 +46,7 @@ def create_imap(login: str):
elif DOMAN_GMAIL in login:
imap = imaplib.IMAP4_SSL(SEREVER_GMAIL, port=993)
elif DOMAIN_ONET in login:
imap = imaplib.IMAP4_SSL(SERVER_IMAGE_ONET, port=993)
imap = IMAPClient(SERVER_IMAGE_ONET, use_uid=True)
elif DOMAIN_TIM_IT in login:
imap = imaplib.IMAP4(TIME_IT_SERVER)
elif DOMAIN_ALICE_IT in login:
+79 -64
View File
@@ -14,8 +14,9 @@ from src.pojo.mail.mail_pojo import MailPojo, MailAddress
from src.utils.timeutiles import is_time_between
VALIDATION_URL_SUBJECT_fr = 'Validation de votre demande de rendez-vous'
VALIDATION_URL_SUBJECT_EN = 'Please confirm your appointment'
VALIDATION_URL_REGEX = """https:\/\/rendezvousparis.hermes.com\/client\/register\/[A-Z0-9]+\/validate.code=[A-Z0-9]+"""
VALIDATION_URL_SUBJECT_EN = 'Please confirm your appointment request'
# VALIDATION_URL_REGEX = """https:\/\/rendezvousparis.hermes.com\/client\/register\/[A-Z0-9]+\/validate.code=[A-Z0-9]+"""
VALIDATION_URL_REGEX = """client\/register\/[A-Z0-9]+\/validate.code=[A-Z0-9]+"""
HERMES_EMAIL = "no-reply@hermes.com"
date_format = "%d-%b-%Y" # DD-Mon-YYYY e.g., 3-Mar-2014
@@ -39,80 +40,93 @@ class MailReader():
# create an IMAP4 class with SSL
imap = create_imap(self.login)
# authenticate
type, dat = imap.login(self.login, str(self.password))
print("type is {} for {}".format(type, self.login))
dat = imap.login(self.login, str(self.password))
print("type is {} for {}".format(dat, self.login))
mail_list = []
print("read mails from {}".format(self.login))
folder_list = self.show_folders(imap)
# total number of emails
# get mails from inbox
# (\Archive \HasNoChildren) = "Archive"
# (\Junk \HasNoChildren) = "Bulk"
# (\Drafts \HasNoChildren) = "Draft"
# (\HasNoChildren) = "Inbox"
# (\Sent \HasNoChildren) = "Sent"
# (\Trash \HasNoChildren) = "Trash"
for folder in folder_list:
mail_list.extend(self._get_messages_from_folder(imap, subject=VALIDATION_URL_SUBJECT_fr, folder=folder))
mail_list.extend(self._get_messages_from_folder(imap, subject=VALIDATION_URL_SUBJECT_EN, folder=folder))
print("folder is {}".format(folder))
# folder_list = self.show_folders(imap)
# for folder in folder_list:
# print("folder is {}".format(folder))
mail_list.extend(self._get_messages_from_folder(imap, subject=VALIDATION_URL_SUBJECT_fr))
mail_list.extend(self._get_messages_from_folder(imap, subject=VALIDATION_URL_SUBJECT_EN))
if DOMAIN_HOTMAIL in self.login:
mail_list.extend(self._get_messages_from_folder(imap, subject=VALIDATION_URL_SUBJECT_EN, folder="Junk"))
# mail_list.extend(self._get_messages_from_folder(imap, subject=VALIDATION_URL_SUBJECT_EN, folder="Bulk"))
# mail_list.extend(self._get_messages_from_folder(imap, folder="Bulk"))
# close the connection and logout
imap.close()
# imap.close()
imap.logout()
mails_messages.extend(mail_list)
return mail_list
def _get_messages_from_folder(self, imap, subject, folder="INBOX") -> list:
imap.select(folder)
mail_messages = []
typ, data = imap.search(None, '(SUBJECT "{}" SINCE "{}")'.format(subject,
# search_terms = '(SUBJECT "{}" SINCE "{}")'.format(subject,
# datetime.datetime.today().strftime(
# date_format))
search_terms = 'SINCE "{}"'.format(
datetime.datetime.today().strftime(
date_format)))
for i in data[0].split():
# fetch the email message by ID
res, msg = imap.fetch(i.decode("utf-8"), "(RFC822)")
body = ''
for response in msg:
if isinstance(response, tuple):
# parse a bytes email into a message object
msg = email.message_from_bytes(response[1])
# decode the email subject
subject, subject_encoded = decode_header(msg["Subject"])[0]
received_date = msg["Date"]
if isinstance(subject, bytes):
# if it's a bytes, decode to str
subject = subject.decode(subject_encoded)
# decode email sender
from_address, subject_encoded = decode_header(msg.get("From"))[0]
if isinstance(from_address, bytes):
from_address = from_address.decode(subject_encoded)
print("Email:", self.login)
print("From:", from_address)
print("Subject:", subject)
# if the email message is multipart
if msg.is_multipart():
# iterate over email parts
for part in msg.walk():
try:
# get the email body
payloads = part.get_payload()
if isinstance(payloads, list):
for payload in payloads:
if isinstance(payload, Message):
body = body + payload.get_payload(decode=True).decode("iso-8859-1")
# print(body)
except Exception as Error:
print(Error)
else:
body = msg.get_payload(decode=True).decode()
print(body)
date_format))
print("search terms is " + search_terms)
imap.select_folder(folder)
messages = imap.search(['SINCE', datetime.datetime.today()])
print("%d messages from our best friend" % len(messages))
for uid, message_data in imap.fetch(messages, 'RFC822').items():
email_message = email.message_from_bytes(message_data[b'RFC822'])
from_address = email_message.get('FROM')
subject = email_message.get('subject')
# print("{}, {},{}".format(from_address, subject, email_message))
body = ""
if "no-reply@hermes.com" in from_address:
for part in email_message.walk():
print(part.get_content_type())
if part.get_content_type() == "text/html":
body = body + part.get_payload()
elif part.get_content_type() == "text/plain":
body = body + part.get_payload()
if VALIDATION_URL_SUBJECT_fr in subject or VALIDATION_URL_SUBJECT_EN in subject:
mail = MailPojo(subject=subject, body=body, from_address=from_address)
mail_messages.append(mail)
# for i in data[0].split():
# # fetch the email message by ID
# res, msg = imap.fetch(i, "(RFC822)")
# body = ''
# for response in msg:
# if isinstance(response, tuple):
# # parse a bytes email into a message object
# msg = email.message_from_bytes(response[1])
# # decode the email subject
# subject, subject_encoded = decode_header(msg["Subject"])[0]
# received_date = msg["Date"]
# if isinstance(subject, bytes):
# # if it's a bytes, decode to str
# subject = subject.decode(subject_encoded)
# # decode email sender
# from_address, subject_encoded = decode_header(msg.get("From"))[0]
# if isinstance(from_address, bytes):
# from_address = from_address.decode(subject_encoded)
# print("Email:", self.login)
# print("From:", from_address)
# print("Subject:", subject)
# # if the email message is multipart
# if msg.is_multipart():
# # iterate over email parts
# for part in msg.walk():
# try:
# # get the email body
# payloads = part.get_payload()
# if isinstance(payloads, list):
# for payload in payloads:
# if isinstance(payload, Message):
# body = body + payload.get_payload(decode=True).decode("iso-8859-1")
# # print(body)
# except Exception as Error:
# print(Error)
# else:
# body = msg.get_payload(decode=True).decode()
# print(body)
# if VALIDATION_URL_SUBJECT_fr in subject or VALIDATION_URL_SUBJECT_EN in subject:
# mail = MailPojo(subject=subject, body=body, from_address=from_address)
# mail_messages.append(mail)
return mail_messages
@@ -157,10 +171,10 @@ def need_to_check_email(mail: str, successful_items) -> bool:
def read_mails():
# check time before start checking emails
if is_time_between(time(7, 30), time(19, 30)):
if is_time_between(time(7, 30), time(20, 30)):
# get email address
mail_list = MONGO_STORE_MANAGER.get_destination_emails()
# mail_address1 = MailAddress(mail="vincenzoditaranto@tim.it", password="severiana")
# mail_address1 = MailAddress(mail="brasentuncau1973@onet.pl", password="Ex#Mhz9Fom6")
# mail_address1 = MailAddress(mail="chenpeijun@aol.com", password="ytifuwguknzifqyb")
# # mail_address2 = MailAddress(mail="sdfgfhgf1986@aol.com", password="fjwcgvhxxlywqfwm")
# # mail_address3 = MailAddress(mail="ciyuexie@aol.com", password="czezlmmyypokdfce")
@@ -179,9 +193,10 @@ def read_mails():
with ThreadPoolExecutor(max_workers=10) as executor:
for mail in mails_messages:
match = re.search(VALIDATION_URL_REGEX, mail.body)
match = re.search(VALIDATION_URL_REGEX, mail.body.replace("\n", ""))
if match:
url = match.group(0)
url_to_validate= match.group(0)
url = "https://rendezvousparis.hermes.com/"+url_to_validate
if need_to_valid_url(url, successful_items):
MONGO_STORE_MANAGER.save_links_to_validate(url)
# url_validator = LinkValidator(url)