Files
appointment_tool/src/person_name/contact_manager.py
T
2025-04-01 21:32:37 +02:00

280 lines
13 KiB
Python
Executable File

import datetime
import random
from pathlib import Path
import xlsxwriter
from src.db.mongo_manager import MONGO_STORE_MANAGER
from src.person_name.extract_name_with_pinyinlist import filter_already_validated_contacts, read_pinyin_list_from_file
from src.pojo.contact_pojo import ContactPojo
from src.utils.contacts.generate_random_passport_id import generate_single_titre_sejour_number, \
get_random_passport_id_number
from src.utils.excel_reader import read_contacts, fr_phone_number_prefix, get_random_fr_phone_numbers, ExcelHelper
# from src.person_name.cython_extract_methods import filter_already_validated_contacts, read_pinyin_list_from_file
DEFAULT_SERIAL_TO_IGNORE = ["47e7e36b", "bitbrowser"]
def upload_contacts_list():
_contacts_to_book = read_contacts(str(Path.home()) + "/Desktop/real_name_contacts_500_27_03_25_fixed_mac.xlsx")
return _contacts_to_book
def fix_phone_number_format(file_path):
_contact_list = read_contacts(file_path)
for _contact in _contact_list:
if _contact.first_name is None or len(_contact.first_name) == 0:
print(_contact)
# _contact.last_name.replace("\xa0", " ")
original_last_name = _contact.last_name
_contact.last_name = original_last_name.replace("\xa0", " ").split(" ")[0]
_contact.first_name = original_last_name.replace("\xa0", " ").split(" ")[1]
print(_contact)
if _contact.phone.startswith('7'):
_need_to_fix = True
for prefix in fr_phone_number_prefix:
if _contact.phone.startswith(prefix):
_need_to_fix = False
# if _contact.phone[0:2] not in fr_phone_number_prefix:
if _need_to_fix:
print(_contact)
_contact.phone = get_random_fr_phone_numbers()
write_new_contacts_to_excel(_contact_list, file_name="real_name_contacts_500_27_03_25_fixed")
def generate_contact_from_mail_list(mail_list_file,
name_list_file_path="all_new_name_list.txt"):
execl_reader = ExcelHelper()
mail_list = execl_reader.read_mails_and_pwd(mail_list_file)
print("mail_list size is {}".format(len(mail_list)))
# print("mail_list size before filter is {}".format(len(mail_list)))
filter_already_validated_contacts(mail_list)
print("mail_list size after filter is {}".format(len(mail_list)))
generate_contacts = []
pinyin_name_list = read_pinyin_list_from_file(name_list_file_path)
random.shuffle(pinyin_name_list)
print(pinyin_name_list[0])
for mail in mail_list:
phone_number = get_random_fr_phone_numbers()
passport_number = get_random_passport_id_number()
resident_card_number = generate_single_titre_sejour_number()
name = random.choice(pinyin_name_list)
last_name = name.split(" ")[0]
first_name = name.split(" ")[1]
contact = ContactPojo(mail=mail.mail, phone_number=phone_number, passport=passport_number,
last_name=last_name, first_name=first_name, store="random")
contact.resident_card_number = resident_card_number
generate_contacts.append(contact)
write_new_contacts_to_excel(generate_contacts)
def write_new_contacts_to_excel(valid_contacts: list, file_name=str(datetime.date.today())):
row = 0
col = 0
# Create a workbook and add a worksheet.
workbook = xlsxwriter.Workbook('contact_list_{}.xlsx'.format(file_name))
header_data = ['name', 'phone', 'passport', 'email', 'store', 'serial', 'ip_country', 'ua',
'resident_card_number', 'source_from']
worksheet = workbook.add_worksheet()
header_format = workbook.add_format({'bold': True})
for col_num, data in enumerate(header_data):
worksheet.write(row, col_num, data, header_format)
row = row + 1
for info in valid_contacts:
# Iterate over the data and write it out row by row.
worksheet.write(row, col, "{} {}".format(info.last_name, info.first_name))
worksheet.write(row, col + 1, info.phone)
worksheet.write(row, col + 2, info.passport)
worksheet.write(row, col + 3, info.mail)
worksheet.write(row, col + 4, info.store)
worksheet.write(row, col + 5, info.serial)
worksheet.write(row, col + 6, info.ip_country)
worksheet.write(row, col + 7, info.ua)
worksheet.write(row, col + 8, info.resident_card_number)
worksheet.write(row, col + 9, info.source_from)
row += 1
workbook.close()
def generate_valid_contact_list_for_day(segment_number=1):
_collection_name = "2025-03-31"
_valid_contact_list = MONGO_STORE_MANAGER.get_all_successful_items_for_one_day(_collection_name)
_all_contacts = MONGO_STORE_MANAGER.get_all_contacts_to_book()
_contact_to_save = []
for _contact in _valid_contact_list:
# _contact.store = "faubourg"
_contact.store = "random"
# if _contact.last_name is None or len(_contact.last_name) == 0:
for _true_contact in _all_contacts:
if _true_contact.mail == _contact.mail:
_contact.last_name = _true_contact.last_name
_contact.phone = _true_contact.phone
_contact.passport = _true_contact.passport
_contact.first_name = _true_contact.first_name
_contact.resident_card_number = _true_contact.resident_card_number
# if _contact.resident_card_number is None or len(_contact.resident_card_number) == 0:
# for _true_contact in _all_contacts:
# if _true_contact.mail == _contact.mail:
# _contact.resident_card_number = _true_contact.resident_card_number
# if len(_contact.serial) == 0:
# _contact.serial = DEFAULT_SERIAL
if _contact.url_validated:
if _contact.last_name is not None and len(_contact.last_name) > 0:
_need_to_save = True
# remove the duplicated items
for _added_item in _contact_to_save:
if (_added_item.mail == _contact.mail
and _added_item.phone == _contact.phone
and _added_item.passport == _contact.passport
and _added_item.last_name == _contact.last_name
and _added_item.first_name == _contact.first_name):
_need_to_save = False
if _need_to_save:
_contact_to_save.append(_contact)
_contact_to_save_list = _contact_to_save
_contact_serial_map = MONGO_STORE_MANAGER.get_all_contact_serial_list()
for contact in _contact_to_save_list:
if contact.serial in DEFAULT_SERIAL_TO_IGNORE:
for _contact_serial in _contact_serial_map:
if _contact_serial.mail == contact.mail:
contact.serial = _contact_serial.serial
break
write_new_contacts_to_excel(_contact_to_save_list, file_name=_collection_name)
write_list_with_segment_number(_collection_name, _contact_to_save_list, segment_number)
def write_list_with_segment_number(file_name, _contact_to_save_list, segment_number):
_mac_list = []
_win_list = []
for _contact in _contact_to_save_list:
if _contact.source_from == "rdv" or _contact.source_from == "lpan" or _contact.source_from == "panlei":
_mac_list.append(_contact)
else:
_win_list.append(_contact)
write_new_contacts_to_excel(_mac_list, file_name=file_name + "_mac")
write_new_contacts_to_excel(_win_list, file_name=file_name + "_win")
# for i in range(0, segment_number):
# _step = int(len(_contact_to_save_list) / segment_number)
# _sublist = _contact_to_save_list[i * _step:_step * (i + 1)]
# _file_name = file_name + "_" + str(i + 1)
# write_new_contacts_to_excel(_sublist, file_name=_file_name)
def merge_contact_list_files(file_list: list, final_file_name="merged_contact_list"):
_all_contact_list = []
for file in file_list:
_all_contact_list.extend(read_contacts(file))
for _con in _all_contact_list:
_con.store = "random"
_contact_serial_map = MONGO_STORE_MANAGER.get_all_contact_serial_list()
for contact in _all_contact_list:
if contact.serial == DEFAULT_SERIAL_TO_IGNORE:
for _contact_serial in _contact_serial_map:
if _contact_serial.mail == contact.mail:
contact.serial = _contact_serial.serial
break
print(len(_all_contact_list))
_list_without_duplicate = list(set(_all_contact_list))
print(len(_list_without_duplicate))
write_new_contacts_to_excel(_list_without_duplicate, file_name=final_file_name)
def generate_all_contact_list():
_all_contacts = MONGO_STORE_MANAGER.get_all_contacts_to_book()
random.shuffle(_all_contacts)
for _contact in _all_contacts:
_contact.store = "random"
write_new_contacts_to_excel(_all_contacts, file_name="all")
write_list_with_segment_number("all", _all_contacts, 1)
def write_to_black_list(contacts: list):
for contact in contacts:
MONGO_STORE_MANAGER.insert_blacklist_contact(contact)
def update_contact_list_not_received_mail():
_contact_list = MONGO_STORE_MANAGER.get_all_successful_items_for_day()
for _contact in _contact_list:
if not _contact.url_validated:
write_to_black_list([_contact])
def get_old_validated_contact_list(remove_blacklisted_contact=True, _day_in_str="2024-09-06"):
_valid_contact_list = MONGO_STORE_MANAGER.get_all_successful_items_for_one_day(_day_in_str)
_domain_list_to_remove = ["firemail.de", "onet.pl", "yahoo.com", "gmx.com"]
_all_contacts = MONGO_STORE_MANAGER.get_all_contacts_to_book()
_contact_to_save = []
for _contact in _valid_contact_list:
# _contact.store = "faubourg"
_contact.store = "random"
if _contact.last_name is None or len(_contact.last_name) == 0:
for _true_contact in _all_contacts:
if _true_contact.mail == _contact.mail:
_contact.last_name = _true_contact.last_name
_contact.phone = _true_contact.phone
_contact.passport = _true_contact.passport
_contact.first_name = _true_contact.first_name
if _contact.url_validated:
if _contact.last_name is not None and len(_contact.last_name) > 0:
_need_to_save = True
# remove the duplicated items
for _added_item in _contact_to_save:
if (_added_item.mail == _contact.mail
and _added_item.phone == _contact.phone
and _added_item.passport == _contact.passport
and _added_item.last_name == _contact.last_name
and _added_item.first_name == _contact.first_name):
_need_to_save = False
if _need_to_save:
# remove the contact if domain is not in the list
if _contact.mail.split("@")[1] not in _domain_list_to_remove:
_contact_to_save.append(_contact)
# remove blacklisted contact if needed
if remove_blacklisted_contact:
_blacklisted_contact = MONGO_STORE_MANAGER.get_blacklist_contacts()
_list_without_blacklisted = [contact for contact in _contact_to_save if
contact.mail not in [bl.mail for bl in _blacklisted_contact]]
write_new_contacts_to_excel(_list_without_blacklisted, file_name=_day_in_str + "_without_blacklisted")
_contact_to_save_list = _contact_to_save
write_new_contacts_to_excel(_contact_to_save_list, file_name=_day_in_str)
# 把新的联系人存到网上
def write_resident_card_number_to_contact_list(file_to_read, file_name="contact_list_all.xlsx"):
_contacts_to_book = read_contacts(file_to_read)
_all_contact_list = MONGO_STORE_MANAGER.get_all_contacts_to_book()
for _contact in _contacts_to_book:
for _real_contact in _all_contact_list:
if _real_contact.mail == _contact.mail:
_contact.resident_card_number = _real_contact.resident_card_number
write_list_with_segment_number(file_name, _contacts_to_book, 1)
if __name__ == '__main__':
# write_resident_card_number_to_contact_list(file_to_read=str(Path.home()) + "/Desktop/contact_list_all_13.xlsx",
# file_name="contact_list_all_13")
# contacts_to_book = upload_contacts_list()
# MONGO_STORE_MANAGER.upload_contact_list(contacts_to_book)
# print("start at {}".format(datetime.datetime.now()))
generate_valid_contact_list_for_day(segment_number=2)
# generate_contact_from_mail_list("/Users/lpan/Downloads/邮箱及密码_23_03_25_yahoo.xlsx")
# print("end at {}".format(datetime.datetime.now()))
# update_contact_list_not_received_mail()
# get_old_validated_contact_list()
# print("end at {}".format(datetime.datetime.now()))
# generate_all_contact_list()
# merge_contact_list_files(
# ["/Users/lpan/Desktop/contact_list_2024-11-04.xlsx",
# "/Users/lpan/Desktop/contact_list_2024-11-07.xlsx",
# "/Users/lpan/Desktop/contact_list_2024-11-08.xlsx",
# "/Users/lpan/Desktop/contact_list_2024-11-06.xlsx"
# ])
# fix_phone_number_format(str(Path.home()) + "/Desktop/contact_list_2025-03-29_mac.xlsx")