Merge branch 'feature/extract_name'

This commit is contained in:
2023-04-28 20:22:41 +02:00
9 changed files with 295 additions and 14 deletions
+4 -3
View File
@@ -175,14 +175,15 @@ def read_mails_and_find_confirmation_contacts():
for mail in mails_messages:
message_body = mail.body
for item in successful_items:
if item.id in message_body and item.id != "welcome":
if item.id in message_body and item.id != "welcome" and len(item.id) > 0:
item.message = message_body
accepted_appointment_list.append(item)
elif "10:30" in message_body and (
item.email == mail.mail_address or item.email in message_body) and len(item.passport) > 0:
item.email == mail.mail_address or item.email in message_body) and len(item.id) > 0:
item.message = message_body
accepted_appointment_list.append(item)
elif "11:30" in message_body and (item.email == mail.mail_address or item.email in message_body):
elif "11:30" in message_body and (
item.email == mail.mail_address or item.email in message_body) and len(item.id) > 0:
item.message = message_body
accepted_appointment_list.append(item)
print(mail.mail_address)
+35 -5
View File
@@ -20,11 +20,37 @@ VALIDATION_URL_SUBJECT_EN = 'Please confirm your appointment request'
VALIDATION_URL_REGEX = """https:\/\/rendezvousparis.hermes.com\/client\/register\/[A-Z0-9]+\/validate.code=[A-Z0-9]+"""
PART_VALIDATION_URL_REGEX = """client\/register\/[A-Z0-9]+\/validate.code=[A-Z0-9]+"""
HERMES_EMAIL = "no-reply@hermes.com"
EMAIL_ADDRESS_REGEX = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,7}\b'
date_format = "%d-%b-%Y" # DD-Mon-YYYY e.g., 3-Mar-2014
REDIRECTION_MAILS = "appointment2022@aol.com, chenpeijun@aol.com,hongjiang176@aol.com,ciyuexie@aol.com"
def check_email_address(email):
# pass the regular expression
# and the string into the fullmatch() method
if (re.fullmatch(EMAIL_ADDRESS_REGEX, email)):
print("Valid Email")
return True
else:
print("Invalid Email")
return False
def find_from_mail(param):
from_address, encoded_algo = param[0]
if isinstance(from_address, bytes):
from_address = from_address.decode(encoded_algo)
if not check_email_address(from_address) and len(param) == 2:
from_address, new_encode = param[1]
if new_encode is None:
new_encode = encoded_algo
if isinstance(from_address, bytes):
from_address = from_address.decode(new_encode)
return from_address.strip(" ").strip(">").strip("<")
return from_address.strip(" ").strip(">").strip("<")
class MailReader():
def __init__(self, login, password):
self.login = login
@@ -92,11 +118,11 @@ class MailReader():
# if it's a bytes, decode to str
subject = subject.decode(subject_encoded)
# decode email sender
from_address, subject_encoded = decode_header(msg.get("From"))[0]
if isinstance(from_address, bytes):
from_address = from_address.decode(subject_encoded)
from_address = find_from_mail(decode_header(msg.get("From")))
to_email = find_from_mail(decode_header(msg.get("To")))
print("Email:", self.login)
print("From:", from_address)
print("To:", to_email)
print("Subject:", subject)
# if the email message is multipart
if msg.is_multipart():
@@ -117,6 +143,10 @@ class MailReader():
print(body)
if VALIDATION_URL_SUBJECT_fr in subject or VALIDATION_URL_SUBJECT_EN in subject:
mail = MailPojo(subject=subject, body=body, from_address=from_address)
if to_email is None:
mail.to_address = self.login
else:
mail.to_address = to_email
mail.mail_address = self.login
mail_messages.append(mail)
return mail_messages
@@ -214,7 +244,7 @@ def read_mails():
if is_time_between(time(7, 30), time(19, 30)):
# get email address
mail_list = MONGO_STORE_MANAGER.get_destination_emails()
# mail_address1 = MailAddress(mail="Saniremvazhaun@yahoo.com", password="hxwgldifdnuacoyr")
# mail_address1 = MailAddress(mail="appointment2022@aol.com", password="gyilpmvyyvlcaviq")
# mail_address1 = MailAddress(mail="chenpeijun@aol.com", password="ytifuwguknzifqyb")
# # mail_address3 = MailAddress(mail="ciyuexie@aol.com", password="czezlmmyypokdfce")
# mail_list = [mail_address1]
@@ -240,7 +270,7 @@ def read_mails():
# else:
url = match.group(0)
if need_to_valid_url(url, successful_items):
MONGO_STORE_MANAGER.save_links_to_validate(url, mail.mail_address)
MONGO_STORE_MANAGER.save_links_to_validate(url, mail.to_address)
# url_validator = LinkValidator(url)
print("need to validate url: " + url)
# executor.submit(url_validator.start_page, params.get_proxy(ProxyType.OXYLABS), False)
View File
+30
View File
@@ -0,0 +1,30 @@
from pypinyin import pinyin, lazy_pinyin, Style
def read_name_from_files_by_line():
# Using readlines()
file1 = open('/Users/lpan/Downloads/Chinese_Names_Corpus.txt', 'r')
file2 = open('pinyin_list.txt', 'w')
lines = file1.readlines()
count = 0
# Strips the newline character
for line in lines:
count += 1
print("Line{}: {}".format(count, line.strip()))
name_to_save = convert_name_to_pinyin(line.strip())
file2.writelines(name_to_save + "\n")
print(name_to_save)
file1.close()
file2.close()
def convert_name_to_pinyin(name: str):
name_in_pinyin_list = lazy_pinyin(name)
true_list = []
for item in name_in_pinyin_list:
if item != '\ufeff':
true_list.append(item)
return true_list[0] + " " + "".join(true_list[1:len(true_list)])
read_name_from_files_by_line()
@@ -0,0 +1,108 @@
import itertools
import random
import xlsxwriter
from src.db.mongo_manager import MongoDbManager
from src.pojo.contact_pojo import ContactPojo
from src.utils.excel_reader import get_random_phone_numbers
from src.utils.generate_random_passport_id import get_random_passport_id_number
def get_ordered_combins(stuff):
list_to_return = []
for i, j in itertools.combinations(range(len(stuff) + 1), 2):
print(stuff[i:j])
list_to_return.append(stuff[i:j])
return list_to_return
def get_better_list(list):
return list
# for name in list:
# if len(name) == 2:
# list.remove(name)
# return list
# 关键词提取
def read_pinyin_list_from_file() -> list:
file2 = open('clean_list.txt', 'r')
lines = file2.readlines()
name_list = []
count = 0
for line in lines:
count += 1
print("Line{}: {}".format(count, line.strip()))
name_list.append(line.strip())
return name_list
def generate_name_from_email(mail_address, pinyin_name_list):
# key_words = HanLP.extractKeyword(mail_address, 2)
# print(key_words)
# setence = "".join(key_words)
all_combins = get_ordered_combins(mail_address)
for i in all_combins:
word_to_test = "".join(i)
if len(word_to_test) >= 3:
# print("word to test is " + word_to_test)
for name in pinyin_name_list:
last_name = name.split(" ")[0]
first_name = name.split(" ")[-1]
if word_to_test in last_name:
return last_name, first_name
elif word_to_test in first_name:
return last_name, first_name
# 选择不重复的
# if len(pinyin_name_list) > 3:
# return get_better_list(pinyin_name_list)
# else:
return None
def write_new_contacts_to_excel(valid_contacts: list):
row = 0
col = 0
# Create a workbook and add a worksheet.
workbook = xlsxwriter.Workbook('real_name_contacts_{}.xlsx'.format(len(valid_contacts)))
header_data = ['name', 'phone', 'passport', 'email', 'note']
worksheet = workbook.add_worksheet()
header_format = workbook.add_format({'bold': True})
for col_num, data in enumerate(header_data):
worksheet.write(row, col_num, data, header_format)
row = row + 1
for info in valid_contacts:
info.phone = get_random_phone_numbers()
info.passport = get_random_passport_id_number()
# Iterate over the data and write it out row by row.
worksheet.write(row, col, "{} {}".format(info.last_name, info.first_name))
worksheet.write(row, col + 1, info.phone)
worksheet.write(row, col + 2, info.passport)
worksheet.write(row, col + 3, info.mail)
worksheet.write(row, col + 4, info.note)
row += 1
workbook.close()
if __name__ == '__main__':
db_manager = MongoDbManager()
mail_list = db_manager.get_destination_emails()[101:1000]
# mail_list = db_manager.get_destination_emails()[3001:3200]
# mail_list = db_manager.get_destination_emails()[50:200]
generate_contacts = []
pinyin_name_list = read_pinyin_list_from_file()
random.shuffle(pinyin_name_list)
for mail in mail_list:
contact = ContactPojo(mail=mail.mail, phone_number="", passport_number="", last_name="", first_name="")
spliteed = mail.mail.split("@")
possible_name_list = generate_name_from_email(spliteed[0], pinyin_name_list)
if possible_name_list is not None:
contact.last_name = possible_name_list[0]
contact.first_name = possible_name_list[1]
if len(contact.last_name) > 0 and len(contact.first_name) > 0:
generate_contacts.append(contact)
write_new_contacts_to_excel(generate_contacts)
+2
View File
@@ -12,6 +12,7 @@ class ContactPojo:
mail: str
ccid: str
position: int
note: str
def __init__(self, phone_number: str, passport_number: str, last_name: str, first_name: str, mail: str,
ccid: str = "",
@@ -23,6 +24,7 @@ class ContactPojo:
self.ccid = ccid
self.mail = mail
self.position = position
self.note = ""
def to_firestore_dict(self):
dest = {
+2
View File
@@ -22,6 +22,7 @@ class MailAddress:
class MailPojo:
from_address: str
to_address: str
body: str
subject: str
mail_address: str = ""
@@ -32,3 +33,4 @@ class MailPojo:
self.subject = subject
self.from_address = from_address
self.isImapClient = False
self.to_address = ""
+7 -6
View File
@@ -39,6 +39,7 @@ class ExcelHelper:
print(user_agent_list)
def read_contacts(self, file_name=CONTACT_LIST_FILE) -> list:
print("read file " + file_name)
contact_list_in_json = pandas.read_excel(file_name).to_json(orient='records')
contact_dict_list = json.loads(contact_list_in_json)
contact_list = []
@@ -60,8 +61,8 @@ class ExcelHelper:
contact_list.append(contact)
return contact_list
def check_contact_list(self):
contact_list = self.read_contacts()
def check_contact_list(self, file_name=CONTACT_LIST_FILE):
contact_list = self.read_contacts(file_name)
for contact in contact_list:
if contact.first_name is None or len(contact.first_name) == 0:
print("error in firstName for " + contact.mail)
@@ -227,12 +228,12 @@ def save_mails_to_db():
if __name__ == '__main__':
# excel_reader = ExcelHelper()
# contacts = excel_reader.read_names("/Users/lpan/Downloads/10_rambler_ru_23_03_2023.xlsx")
# contacts = excel_reader.read_names("/Users/lpan/Downloads/gmail_10.xlsx")
# print(contacts)
# write_new_contacts_to_excel(valid_contacts=contacts)
# excel_reader = ExcelHelper()
# excel_reader.check_contact_list()
save_mails_to_db()
excel_reader = ExcelHelper()
excel_reader.check_contact_list("/Users/lpan/Desktop/contact_email_valid.xlsx")
# save_mails_to_db()
# for mail in excel_reader.read_mails_and_pwd():
# MONGO_STORE_MANAGER.insert_email(mail)
# for i in range(1, 64):
+107
View File
@@ -0,0 +1,107 @@
import itertools
import xlsxwriter
from pyhanlp import *
from Pinyin2Hanzi import DefaultHmmParams
from Pinyin2Hanzi import viterbi
from itertools import combinations
from src.db.mongo_manager import MongoDbManager
from src.pojo.contact_pojo import ContactPojo
from src.utils.excel_reader import get_random_phone_numbers, generate_email_from_name
from src.utils.generate_random_passport_id import get_random_passport_id_number
def get_ordered_combins(stuff):
list_to_return = []
for i, j in itertools.combinations(range(len(stuff) + 1), 2):
print(stuff[i:j])
list_to_return.append(stuff[i:j])
return list_to_return
def get_better_list(list):
return list
# for name in list:
# if len(name) == 2:
# list.remove(name)
# return list
# 关键词提取
def generate_name_from_email(mail_address):
key_words = HanLP.extractKeyword(mail_address, 2)
print(key_words)
hmmparams = DefaultHmmParams()
pinyin_name_list = []
chinese_name_list = []
setence = "".join(key_words)
all_combins = get_ordered_combins(setence)
for i in all_combins:
word_to_test = "".join(i)
if len(word_to_test) >= 2:
# print("word to test is " + word_to_test)
try:
result = viterbi(hmm_params=hmmparams, observations=(word_to_test,), path_num=2)
for item in result:
print(item.score, item.path)
chinese_name_list.extend(item.path)
# print("word is " + word_to_test)
# if len(word_to_test) >= 3:
# all_combins.remove(i)
pinyin_name_list.append(word_to_test)
except Exception as error:
print(error)
print(pinyin_name_list)
# 选择不重复的
# if len(pinyin_name_list) > 3:
# return get_better_list(pinyin_name_list)
# else:
return pinyin_name_list, chinese_name_list
def write_new_contacts_to_excel(valid_contacts: list):
row = 0
col = 0
# Create a workbook and add a worksheet.
workbook = xlsxwriter.Workbook('real_name_contacts_{}.xlsx'.format(len(valid_contacts)))
header_data = ['name', 'phone', 'passport', 'email', 'note']
worksheet = workbook.add_worksheet()
header_format = workbook.add_format({'bold': True})
for col_num, data in enumerate(header_data):
worksheet.write(row, col_num, data, header_format)
row = row + 1
for info in valid_contacts:
info.phone = get_random_phone_numbers()
info.passport = get_random_passport_id_number()
# Iterate over the data and write it out row by row.
worksheet.write(row, col, "{} {}".format(info.last_name, info.first_name))
worksheet.write(row, col + 1, info.phone)
worksheet.write(row, col + 2, info.passport)
worksheet.write(row, col + 3, info.mail)
worksheet.write(row, col + 4, info.note)
row += 1
workbook.close()
if __name__ == '__main__':
db_manager = MongoDbManager()
mail_list = db_manager.get_destination_emails()[501:1000]
# mail_list = db_manager.get_destination_emails()[50:200]
generate_contacts = []
for mail in mail_list:
contact = ContactPojo(mail=mail.mail, phone_number="", passport_number="", last_name="", first_name="")
spliteed = mail.mail.split("@")
possible_name_list = generate_name_from_email(spliteed[0])[0]
chinese_name_list = generate_name_from_email(spliteed[0])[1]
if len(possible_name_list) >= 2:
contact.last_name = possible_name_list[0]
contact.first_name = "".join(possible_name_list[1:-1])
contact.note = " ".join(chinese_name_list)
if len(contact.last_name) > 0 and len(contact.first_name) > 0:
generate_contacts.append(contact)
write_new_contacts_to_excel(generate_contacts)