Files
appointment_tool/src/person_name/contact_manager.py
T

374 lines
18 KiB
Python
Executable File

import datetime
import math
import random
from pathlib import Path
import xlsxwriter
from src.db.mongo_manager import MONGO_STORE_MANAGER
from src.person_name.extract_name_with_pinyinlist import filter_already_validated_contacts, read_pinyin_list_from_file
from src.pojo.contact_pojo import ContactPojo
from src.utils.contacts.generate_random_passport_id import generate_single_titre_sejour_number, \
get_random_passport_id_number
from src.utils.excel_reader import read_contacts, fr_phone_number_prefix, get_random_fr_phone_numbers, ExcelHelper
DEFAULT_SERIAL_TO_IGNORE = ["47e7e36b", "bitbrowser"]
def upload_contacts_list():
_contacts_to_book = read_contacts(str(Path.home()) + "/Desktop/contact_list_2026-04-11_FIXED.xlsx")
return _contacts_to_book
def fix_phone_number_format(file_path):
_contact_list = read_contacts(file_path)
for _contact in _contact_list:
if _contact.first_name is None or len(_contact.first_name) == 0:
print(_contact)
# _contact.last_name.replace("\xa0", " ")
original_last_name = _contact.last_name
_contact.last_name = original_last_name.replace("\xa0", " ").split(" ")[0]
_contact.first_name = original_last_name.replace("\xa0", " ").split(" ")[1]
print(_contact)
if _contact.phone.startswith('7'):
_need_to_fix = True
for prefix in fr_phone_number_prefix:
if _contact.phone.startswith(prefix):
_need_to_fix = False
# if _contact.phone[0:2] not in fr_phone_number_prefix:
if _need_to_fix:
print(_contact)
_contact.phone = get_random_fr_phone_numbers()
write_new_contacts_to_excel(_contact_list, file_name="real_name_contacts_500_27_03_25_fixed")
def generate_contact_from_mail_list(mail_list_file,
name_list_file_path="all_new_name_list.txt"):
execl_reader = ExcelHelper()
mail_list = execl_reader.read_mails_and_pwd(mail_list_file)
print("mail_list size is {}".format(len(mail_list)))
# print("mail_list size before filter is {}".format(len(mail_list)))
filter_already_validated_contacts(mail_list)
print("mail_list size after filter is {}".format(len(mail_list)))
generate_contacts = []
pinyin_name_list = read_pinyin_list_from_file(name_list_file_path)
random.shuffle(pinyin_name_list)
print(pinyin_name_list[0])
for mail in mail_list:
phone_number = get_random_fr_phone_numbers()
passport_number = get_random_passport_id_number()
resident_card_number = generate_single_titre_sejour_number()
name = random.choice(pinyin_name_list)
last_name = name.split(" ")[0]
first_name = name.split(" ")[1]
contact = ContactPojo(mail=mail.mail, phone_number=phone_number, passport=passport_number,
last_name=last_name, first_name=first_name, store="random")
contact.resident_card_number = resident_card_number
generate_contacts.append(contact)
write_new_contacts_to_excel(generate_contacts)
def write_new_contacts_to_excel(valid_contacts: list, file_name=str(datetime.date.today())):
row = 0
col = 0
# Create a workbook and add a worksheet.
workbook = xlsxwriter.Workbook('contact_list_{}.xlsx'.format(file_name), {'nan_inf_to_errors': True})
header_data = ['name', 'phone', 'passport', 'email', 'store', 'serial', 'ip_country', 'ua',
'resident_card_number', 'source_from']
worksheet = workbook.add_worksheet()
header_format = workbook.add_format({'bold': True})
for col_num, data in enumerate(header_data):
worksheet.write(row, col_num, data, header_format)
row = row + 1
def safe_write(row_num, col_num, value):
"""Écrire une valeur en la rendant vide si elle est NaN ou INF"""
try:
# Vérifier si c'est une chaîne contenant nan ou inf (case insensitive)
if isinstance(value, str):
if value.lower() in ['nan', 'inf', '-inf'] or 'nan' in value.lower() or 'inf' in value.lower():
worksheet.write(row_num, col_num, "")
else:
worksheet.write(row_num, col_num, value)
# Vérifier si c'est un nombre et si c'est NaN ou INF
elif isinstance(value, (int, float)):
if math.isnan(value) or math.isinf(value):
worksheet.write(row_num, col_num, "")
else:
worksheet.write(row_num, col_num, value)
else:
worksheet.write(row_num, col_num, value if value is not None else "")
except (TypeError, ValueError):
worksheet.write(row_num, col_num, "")
for info in valid_contacts:
# Iterate over the data and write it out row by row.
worksheet.write(row, col, "{} {}".format(info.last_name, info.first_name))
worksheet.write(row, col + 1, info.phone)
worksheet.write(row, col + 2, info.passport)
worksheet.write(row, col + 3, info.mail)
worksheet.write(row, col + 4, info.store)
worksheet.write(row, col + 5, info.serial)
worksheet.write(row, col + 6, info.ip_country)
safe_write(row, col + 7, info.ua)
worksheet.write(row, col + 8, info.resident_card_number)
worksheet.write(row, col + 9, info.source_from)
row += 1
workbook.close()
def generate_valid_contact_list_for_day(segment_number=1):
_collection_name = "2026-04-11"
_valid_contact_list = MONGO_STORE_MANAGER.get_all_successful_items_for_one_day(_collection_name)
_all_contacts = MONGO_STORE_MANAGER.get_all_contacts_to_book()
_contact_to_save = []
for _contact in _valid_contact_list:
# _contact.store = "faubourg"
_contact.store = "random"
# if _contact.last_name is None or len(_contact.last_name) == 0:
for _true_contact in _all_contacts:
if _true_contact.mail == _contact.mail:
_contact.last_name = _true_contact.last_name
_contact.phone = _true_contact.phone
_contact.passport = str(_true_contact.passport)[:9]
_contact.first_name = _true_contact.first_name
_contact.resident_card_number = str(_true_contact.passport)[:9]
if _contact.mail == "angielovato14903@yahoo.com":
print("no resident card number for " + _contact.mail)
print("{}:{}".format(_true_contact.mail, _true_contact.source_from))
if isinstance(_true_contact.source_from, str) and _true_contact.source_from is not None and len(_true_contact.source_from) > 0:
print(_true_contact.source_from)
_contact.source_from = _true_contact.source_from
else:
print("no source from for " + _contact.mail)
if _contact.url_validated:
if _contact.last_name is not None and len(_contact.last_name) > 0:
if "outlook.com" in _contact.mail or "hotmail.com" in _contact.mail:
_need_to_save = False
else:
_need_to_save = True
# remove the duplicated items
for _added_item in _contact_to_save:
if (_added_item.mail == _contact.mail
and _added_item.phone == _contact.phone
and _added_item.passport == _contact.passport
and _added_item.last_name == _contact.last_name
and _added_item.first_name == _contact.first_name):
_need_to_save = False
if _need_to_save:
_contact_to_save.append(_contact)
_contact_to_save_list = _contact_to_save
_contact_serial_map = MONGO_STORE_MANAGER.get_all_contact_serial_list()
for contact in _contact_to_save_list:
if contact.serial in DEFAULT_SERIAL_TO_IGNORE:
for _contact_serial in _contact_serial_map:
if _contact_serial.mail == contact.mail:
contact.serial = _contact_serial.serial
break
write_new_contacts_to_excel(_contact_to_save_list, file_name=_collection_name)
write_list_with_segment_number(_collection_name, _contact_to_save_list, segment_number)
def write_list_with_segment_number(file_name, _contact_to_save_list, segment_number):
_mac_list = []
_win_list = []
for _contact in _contact_to_save_list:
if _contact.source_from == "panleicim":
_win_list.append(_contact)
else:
_mac_list.append(_contact)
# if _contact.source_from == "rdv" or _contact.source_from == "lpan" or _contact.source_from == "panlei":
# _mac_list.append(_contact)
# else:
# _win_list.append(_contact)
write_new_contacts_to_excel(_mac_list, file_name=file_name + "_mac")
write_new_contacts_to_excel(_win_list, file_name=file_name + "_win")
for i in range(0, segment_number):
_step = int(len(_contact_to_save_list) / segment_number)
_sublist = _contact_to_save_list[i * _step:_step * (i + 1)]
_file_name = file_name + "_" + str(i + 1)
write_new_contacts_to_excel(_sublist, file_name=_file_name)
def merge_contact_list_files(file_list: list, final_file_name="merged_contact_list"):
_all_contact_list = []
for file in file_list:
_all_contact_list.extend(read_contacts(file))
for _con in _all_contact_list:
_con.store = "random"
_contact_serial_map = MONGO_STORE_MANAGER.get_all_contact_serial_list()
for contact in _all_contact_list:
if contact.serial == DEFAULT_SERIAL_TO_IGNORE:
for _contact_serial in _contact_serial_map:
if _contact_serial.mail == contact.mail:
contact.serial = _contact_serial.serial
break
print(len(_all_contact_list))
_list_without_duplicate = list(set(_all_contact_list))
print(len(_list_without_duplicate))
write_new_contacts_to_excel(_list_without_duplicate, file_name=final_file_name)
def generate_all_contact_list():
_all_contacts = MONGO_STORE_MANAGER.get_all_contacts_to_book()
random.shuffle(_all_contacts)
for _contact in _all_contacts:
_contact.store = "random"
_contact.passport = str(_contact.resident_card_number)[:9]
_contact.resident_card_number = str(_contact.resident_card_number)[:9]
write_new_contacts_to_excel(_all_contacts, file_name="all")
write_list_with_segment_number("all", _all_contacts, 1)
def write_to_black_list(contacts: list):
for contact in contacts:
MONGO_STORE_MANAGER.insert_blacklist_contact(contact)
def update_contact_list_not_received_mail():
_contact_list = MONGO_STORE_MANAGER.get_all_successful_items_for_day()
for _contact in _contact_list:
if not _contact.url_validated:
write_to_black_list([_contact])
def get_old_validated_contact_list(remove_blacklisted_contact=True, _day_in_str="2024-09-06"):
_valid_contact_list = MONGO_STORE_MANAGER.get_all_successful_items_for_one_day(_day_in_str)
_domain_list_to_remove = ["firemail.de", "onet.pl", "yahoo.com", "gmx.com"]
_all_contacts = MONGO_STORE_MANAGER.get_all_contacts_to_book()
_contact_to_save = []
for _contact in _valid_contact_list:
# _contact.store = "faubourg"
_contact.store = "random"
if _contact.last_name is None or len(_contact.last_name) == 0:
for _true_contact in _all_contacts:
if _true_contact.mail == _contact.mail:
_contact.last_name = _true_contact.last_name
_contact.phone = _true_contact.phone
_contact.passport = _true_contact.passport
_contact.first_name = _true_contact.first_name
if _contact.url_validated:
if _contact.last_name is not None and len(_contact.last_name) > 0:
_need_to_save = True
# remove the duplicated items
for _added_item in _contact_to_save:
if (_added_item.mail == _contact.mail
and _added_item.phone == _contact.phone
and _added_item.passport == _contact.passport
and _added_item.last_name == _contact.last_name
and _added_item.first_name == _contact.first_name):
_need_to_save = False
if _need_to_save:
# remove the contact if domain is not in the list
if _contact.mail.split("@")[1] not in _domain_list_to_remove:
_contact_to_save.append(_contact)
# remove blacklisted contact if needed
if remove_blacklisted_contact:
_blacklisted_contact = MONGO_STORE_MANAGER.get_blacklist_contacts()
_list_without_blacklisted = [contact for contact in _contact_to_save if
contact.mail not in [bl.mail for bl in _blacklisted_contact]]
write_new_contacts_to_excel(_list_without_blacklisted, file_name=_day_in_str + "_without_blacklisted")
_contact_to_save_list = _contact_to_save
write_new_contacts_to_excel(_contact_to_save_list, file_name=_day_in_str)
# 把新的联系人存到网上
def write_resident_card_number_to_contact_list(file_to_read, file_name="contact_list_all.xlsx"):
_contacts_to_book = read_contacts(file_to_read)
_all_contact_list = MONGO_STORE_MANAGER.get_all_contacts_to_book()
for _contact in _contacts_to_book:
for _real_contact in _all_contact_list:
if _real_contact.mail == _contact.mail:
_contact.resident_card_number = _real_contact.resident_card_number
write_list_with_segment_number(file_name, _contacts_to_book, 1)
def check_resident_card_number(file_path):
"""读取 contact_list Excel 文件,检查 resident_card_number 是否为 9 位纯数字字符串。
若不是则输出该联系人信息,并调用 generate_single_titre_sejour_number() 生成新值进行修复。
最终将所有联系人(含修复结果)写入原文件名+_FIXED 的新文件,保持原有列格式。"""
_contact_list = read_contacts(file_path)
_has_invalid = False
for _contact in _contact_list:
rcn = str(_contact.resident_card_number) if _contact.resident_card_number is not None else ""
if not (len(rcn) == 9 and rcn.isdigit()):
print(_contact)
_contact.resident_card_number = generate_single_titre_sejour_number()
_has_invalid = True
if not _has_invalid:
print("[OK] Tous les resident_card_number sont valides (9 chiffres). Aucun fichier créé.")
return
# Construire le chemin du fichier de sortie : même dossier, nom + _FIXED + extension
p = Path(file_path)
output_file = str(p.parent / (p.stem + "_FIXED" + p.suffix))
# Écriture dans le même format que write_new_contacts_to_excel
row = 0
col = 0
workbook = xlsxwriter.Workbook(output_file, {'nan_inf_to_errors': True})
header_data = ['name', 'phone', 'passport', 'email', 'store', 'serial', 'ip_country', 'ua',
'resident_card_number', 'source_from']
worksheet = workbook.add_worksheet()
header_format = workbook.add_format({'bold': True})
for col_num, data in enumerate(header_data):
worksheet.write(row, col_num, data, header_format)
row = 1
def safe_write_val(row_num, col_num, value):
try:
if isinstance(value, str):
if value.lower() in ['nan', 'inf', '-inf']:
worksheet.write(row_num, col_num, "")
else:
worksheet.write(row_num, col_num, value)
elif isinstance(value, (int, float)):
if math.isnan(value) or math.isinf(value):
worksheet.write(row_num, col_num, "")
else:
worksheet.write(row_num, col_num, value)
else:
worksheet.write(row_num, col_num, value if value is not None else "")
except (TypeError, ValueError):
worksheet.write(row_num, col_num, "")
for info in _contact_list:
worksheet.write(row, col, "{} {}".format(info.last_name, info.first_name))
worksheet.write(row, col + 1, info.phone)
worksheet.write(row, col + 2, info.passport)
worksheet.write(row, col + 3, info.mail)
worksheet.write(row, col + 4, info.store)
worksheet.write(row, col + 5, info.serial)
worksheet.write(row, col + 6, info.ip_country)
safe_write_val(row, col + 7, info.ua)
worksheet.write(row, col + 8, info.resident_card_number)
worksheet.write(row, col + 9, info.source_from)
row += 1
workbook.close()
print("Fichier corrigé écrit dans : " + output_file)
if __name__ == '__main__':
# write_resident_card_number_to_contact_list(file_to_read=str(Path.home()) + "/Desktop/contact_list_all_13.xlsx",
# file_name="contact_list_all_13")
contacts_to_book = upload_contacts_list()
MONGO_STORE_MANAGER.upload_contact_list(contacts_to_book)
# print("start at {}".format(datetime.datetime.now()))
# generate_valid_contact_list_for_day(segment_number=2)
# generate_contact_from_mail_list("/Users/panlei/Downloads/100_yahoo_11_04.xlsx")
# print("end at {}".format(datetime.datetime.now()))
# update_contact_list_not_received_mail()
# get_old_validated_contact_list()
# print("end at {}".format(datetime.datetime.now()))
# generate_all_contact_list()
# merge_contact_list_files(
# "/Users/lpan/Desktop/contact_list_2024-11-06.xlsx"
# ])
# check_resident_card_number(str(Path.home()) + "/Desktop/contact_list_2026-04-11_FIXED.xlsx")
# fix_phone_number_format(str(Path.home()) + "/Desktop/gmx_ch_100_2024-06-13.xlsx")