Files
appointment_tool/src/mail/mail_address_validator.py
T

70 lines
2.2 KiB
Python

import imaplib
from concurrent.futures.thread import ThreadPoolExecutor
from src.db.mongo_manager import MONGO_STORE_MANAGER
from src.mail.mail_constants import DOMAIN_163, DOMAIN_YAHOO, DOMAIN_SINA, IMAP_SERVER_163, YAHOO_IMAP_SERVER, \
IMAP_SERVER_SINA, AOL_IMAP_SERVER
from src.pojo.mail.mail_pojo import MailAddress
class MailAddressValidator():
def __init__(self, login, password):
self.login = login
self.password = password
@staticmethod
def show_folders(imap):
for i in imap.list()[1]:
l = i.decode().split(' "/" ')
print(l[0] + " = " + l[1])
def create_imap(self):
# create an IMAP4 class with SSL
if DOMAIN_163 in self.login:
imap = imaplib.IMAP4_SSL(IMAP_SERVER_163)
elif DOMAIN_YAHOO in self.login:
imap = imaplib.IMAP4_SSL(YAHOO_IMAP_SERVER)
elif DOMAIN_SINA in self.login:
imap = imaplib.IMAP4_SSL(IMAP_SERVER_SINA)
else:
imap = imaplib.IMAP4_SSL(AOL_IMAP_SERVER)
return imap
def is_valid_email_address(self) -> bool:
# authenticate
imap = self.create_imap()
isValid = True
try:
type, dat = imap.login(self.login, self.password)
print("type is " + type)
imap.logout()
except Exception as error:
print(error)
isValid = False
return isValid
def check_and_save_to_db(self):
if not self.is_valid_email_address():
MONGO_STORE_MANAGER.insert_invalid_mail(MailAddress(self.login, self.password))
print("{} is not valid".format(self.login))
def remove_invalid_email():
invalid = MONGO_STORE_MANAGER.get_invalid_emails()
for mail in invalid:
MONGO_STORE_MANAGER.remove_email_from_destination_email_list(mail)
def find_and_update_invalid_emails():
mail_list = MONGO_STORE_MANAGER.get_destination_emails()
with ThreadPoolExecutor(max_workers=20) as executor:
for mail in mail_list:
valiator = MailAddressValidator(mail.mail, mail.password)
executor.submit(valiator.check_and_save_to_db)
if __name__ == '__main__':
# remove_invalid_email()
find_and_update_invalid_emails()