create src module for source code
This commit is contained in:
@@ -0,0 +1,141 @@
|
||||
import json
|
||||
import random
|
||||
import string
|
||||
|
||||
import pandas as pandas
|
||||
import xlsxwriter
|
||||
|
||||
from src.definitions import CONTACT_LIST_FILE
|
||||
from src.pojo.contact_pojo import ContactPojo
|
||||
from src.utils.generate_random_passport_id import get_random_passport_id_number
|
||||
|
||||
phone_number_prefix = ['6']
|
||||
|
||||
|
||||
class ExcelHelper:
|
||||
|
||||
def __init__(self):
|
||||
self._df = pandas.Series()
|
||||
|
||||
def write_to_exel(self, file_name, data_list: list):
|
||||
new_df = pandas.Series(data_list)
|
||||
self._df = pandas.concat([self._df, new_df])
|
||||
self._df.to_excel(file_name)
|
||||
|
||||
def read_contacts(self, file_name=CONTACT_LIST_FILE) -> list:
|
||||
contact_list_in_json = pandas.read_excel(file_name).to_json(orient='records')
|
||||
contact_dict_list = json.loads(contact_list_in_json)
|
||||
contact_list = []
|
||||
for contact_dict in contact_dict_list:
|
||||
if contact_dict['name']:
|
||||
raw_name = contact_dict['name'].strip()
|
||||
name = raw_name.split(' ')
|
||||
last_name = name[0]
|
||||
if len(name) == 2:
|
||||
first_name = name[-1]
|
||||
else:
|
||||
first_name = ''.join(name[1:len(name)])
|
||||
|
||||
contact = ContactPojo(phone_number=contact_dict['phone'],
|
||||
last_name=last_name,
|
||||
first_name=first_name,
|
||||
passport_number=contact_dict['passport'],
|
||||
mail=contact_dict['email'])
|
||||
contact_list.append(contact)
|
||||
return contact_list
|
||||
|
||||
def read_names(self, file_name=CONTACT_LIST_FILE) -> list:
|
||||
contact_list_in_json = pandas.read_excel(file_name).to_json(orient='records')
|
||||
contact_dict_list = json.loads(contact_list_in_json)
|
||||
contact_list = []
|
||||
count = 2
|
||||
for contact_dict in contact_dict_list:
|
||||
if contact_dict['name']:
|
||||
raw_name = contact_dict['name'].strip()
|
||||
name = raw_name.split(' ')
|
||||
if len(name) == 1:
|
||||
name = raw_name.split('\xa0')
|
||||
if len(name) == 1:
|
||||
print("error in " + str(name))
|
||||
last_name = name[0]
|
||||
if len(name) == 2:
|
||||
first_name = name[-1]
|
||||
else:
|
||||
first_name = ''.join(name[1:len(name)])
|
||||
|
||||
contact = ContactPojo(phone_number="",
|
||||
last_name=last_name,
|
||||
first_name=first_name,
|
||||
passport_number="",
|
||||
mail="")
|
||||
|
||||
if len(first_name) == 0:
|
||||
print("first_name is empty: position:" + str(count))
|
||||
print(name)
|
||||
if len(last_name) == 0:
|
||||
print("last_name is empty: position:" + str(count))
|
||||
count = count + 1
|
||||
contact_list.append(contact)
|
||||
|
||||
return contact_list
|
||||
|
||||
|
||||
def get_random_phone_numbers():
|
||||
length = 8 # number of characters in the string.
|
||||
ran = ''.join(random.choices(string.digits, k=length))
|
||||
id_number = random.choice(phone_number_prefix) + str(ran)
|
||||
return id_number
|
||||
|
||||
|
||||
def generate_email_from_name(first_name: str, last_name: str) -> str:
|
||||
length = 2 # number of characters in the string.
|
||||
ran = ''.join(random.choices(string.digits, k=length))
|
||||
separator = ['.', '_', '']
|
||||
domains = ['gmail.com', 'hotmail.com', 'yahoo.com', 'aol.com', 'outlook.com', 'hotmail.fr', 'gmx.com',
|
||||
'hotmail.com', 'yahoo.com', 'aol.com', 'hotmail.com']
|
||||
email = "{}{}{}{}@{}".format(last_name.lower(), random.choice(separator),
|
||||
first_name.replace("-", "").replace("'", "").lower(), ran,
|
||||
random.choice(domains))
|
||||
print(email)
|
||||
return email
|
||||
|
||||
|
||||
def get_random_id_number() -> str:
|
||||
# write_the_valid_profiles_to_excel()
|
||||
S = 8 # number of characters in the string.
|
||||
# call random.choices() string module to find the string in Uppercase + numeric data.
|
||||
ran = ''.join(random.choices(string.digits, k=S))
|
||||
print("The randomly generated string is : 94" + str(ran)) # print the random data
|
||||
return ran
|
||||
|
||||
|
||||
def write_new_contacts_to_excel(valid_contacts: list):
|
||||
row = 0
|
||||
col = 0
|
||||
# Create a workbook and add a worksheet.
|
||||
workbook = xlsxwriter.Workbook('new_profile_{}.xlsx'.format(len(valid_contacts)))
|
||||
header_data = ['name', 'phone', 'passport', 'email']
|
||||
worksheet = workbook.add_worksheet()
|
||||
header_format = workbook.add_format({'bold': True})
|
||||
|
||||
for col_num, data in enumerate(header_data):
|
||||
worksheet.write(row, col_num, data, header_format)
|
||||
row = row + 1
|
||||
for info in valid_contacts:
|
||||
info.phone = get_random_phone_numbers()
|
||||
info.passport = get_random_passport_id_number()
|
||||
info.mail = generate_email_from_name(info.first_name, info.last_name)
|
||||
# Iterate over the data and write it out row by row.
|
||||
worksheet.write(row, col, "{} {}".format(info.last_name, info.first_name))
|
||||
worksheet.write(row, col + 1, info.phone)
|
||||
worksheet.write(row, col + 2, info.passport)
|
||||
worksheet.write(row, col + 3, info.mail)
|
||||
row += 1
|
||||
workbook.close()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
excel_reader = ExcelHelper()
|
||||
contacts = excel_reader.read_names("C:/Users/landd/Desktop/rdv/08-06/500.xlsx")
|
||||
print(contacts)
|
||||
write_new_contacts_to_excel(valid_contacts=contacts)
|
||||
@@ -0,0 +1,73 @@
|
||||
import random
|
||||
import string
|
||||
|
||||
import xlsxwriter
|
||||
|
||||
import params
|
||||
from pojo.ReserveResultPojo import ReserveResultPojo
|
||||
from pojo.contact_pojo import ContactPojo
|
||||
|
||||
|
||||
def get_random_id_number() -> str:
|
||||
# write_the_valid_profiles_to_excel()
|
||||
S = 8 # number of characters in the string.
|
||||
# call random.choices() string module to find the string in Uppercase + numeric data.
|
||||
ran = ''.join(random.choices(string.digits, k=S))
|
||||
id_number = "57" + str(ran)
|
||||
print("The randomly generated string is : 94" + str(ran)) # print the random data
|
||||
return id_number
|
||||
|
||||
|
||||
def write_the_valid_profiles_to_excel():
|
||||
day_list = ['2022-06-02']
|
||||
|
||||
collection = []
|
||||
for day in day_list:
|
||||
collection.extend(params.firebase_store_manager.get_all_successful_items_for_day(day, source_from=None).stream())
|
||||
|
||||
valid_contacts = []
|
||||
# exist_contacts = ExcelHelper().read_contacts()
|
||||
for valid_appointment in collection:
|
||||
reserve_pojo = ReserveResultPojo.from_firestore_dict(valid_appointment.to_dict())
|
||||
# check whether the contact exists already
|
||||
# exist = [contact for contact in valid_contacts if contact.mail == reserve_pojo.email]
|
||||
# if len(exist) == 0:
|
||||
contact = ContactPojo(reserve_pojo.phone, passport_number=get_random_id_number(),
|
||||
last_name=reserve_pojo.lastName, first_name=reserve_pojo.firstName, ccid="",
|
||||
mail=reserve_pojo.email, position=0)
|
||||
# seed = 8 # number of characters in the string.
|
||||
# call random.choices() string module to find the string in Uppercase + numeric data.
|
||||
# contact.passport = get_random_id_number()
|
||||
# if contact.passport == None or len(contact.passport) == 0:
|
||||
# old_contact = [item for item in exist_contacts if item.mail == contact.mail]
|
||||
# if len(old_contact) > 0:
|
||||
# contact.passport = old_contact[0].passport
|
||||
|
||||
print(contact)
|
||||
valid_contacts.append(contact)
|
||||
|
||||
row = 0
|
||||
col = 0
|
||||
# Create a workbook and add a worksheet.
|
||||
workbook = xlsxwriter.Workbook('valid_profile.xlsx')
|
||||
header_data = ['name', 'phone', 'passport', 'email', 'ccid', 'position']
|
||||
worksheet = workbook.add_worksheet()
|
||||
header_format = workbook.add_format({'bold': True})
|
||||
|
||||
for col_num, data in enumerate(header_data):
|
||||
worksheet.write(row, col_num, data, header_format)
|
||||
row = row + 1
|
||||
for info in valid_contacts:
|
||||
# Iterate over the data and write it out row by row.
|
||||
worksheet.write(row, col, "{} {}".format(info.last_name, info.first_name))
|
||||
worksheet.write(row, col + 1, info.phone)
|
||||
worksheet.write(row, col + 2, info.passport)
|
||||
worksheet.write(row, col + 3, info.mail)
|
||||
worksheet.write(row, col + 4, info.ccid)
|
||||
row += 1
|
||||
workbook.close()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# get_random_id_number()
|
||||
write_the_valid_profiles_to_excel()
|
||||
@@ -0,0 +1,43 @@
|
||||
import random
|
||||
import string
|
||||
|
||||
|
||||
# letters = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'J', 'K', 'P', 'Q', 'R', 'S', 'T', '1', '2', '3', '4', '5', '6', '7',
|
||||
# '8', '9']
|
||||
from src.pojo.captcha_error_contact_pojo import ContactInErrorPojo
|
||||
from src.pojo.contact_pojo import ContactPojo
|
||||
|
||||
letters = ['E', 'G', 'M']
|
||||
|
||||
|
||||
def get_random_id_number() -> str:
|
||||
S = 8 # number of characters in the string.
|
||||
ran = ''.join(random.choices(string.digits, k=S))
|
||||
id_number = str(ran)
|
||||
return id_number
|
||||
|
||||
# 产生中国护照号
|
||||
def get_random_passport_id_number() -> str:
|
||||
S = 8 # number of characters in the string.
|
||||
ran = ''.join(random.choices(string.digits, k=S))
|
||||
id_number = random.choice(letters) + str(ran)
|
||||
return id_number
|
||||
|
||||
|
||||
def get_captcha_error_contact_from_contact(contact: ContactPojo, error_type: int) -> ContactInErrorPojo:
|
||||
captcha_error = ContactInErrorPojo()
|
||||
captcha_error.mail = contact.mail
|
||||
captcha_error.ccid = contact.ccid
|
||||
captcha_error.phone = contact.phone
|
||||
captcha_error.passport = contact.passport
|
||||
captcha_error.first_name = contact.first_name
|
||||
captcha_error.last_name = contact.last_name
|
||||
captcha_error.position = contact.position
|
||||
captcha_error.error_type = error_type
|
||||
return captcha_error
|
||||
|
||||
if __name__ == '__main__':
|
||||
# for i in range(1,200):
|
||||
# print(get_random_id_number())
|
||||
for i in range(1, 804):
|
||||
print(get_random_id_number())
|
||||
@@ -0,0 +1,22 @@
|
||||
import threading
|
||||
|
||||
from params import rabittmq_connection
|
||||
|
||||
APPOINTMENT_QUEUE = "APPOINTMENT_QUEUE"
|
||||
|
||||
|
||||
class MessageReceiver:
|
||||
|
||||
def start_listener(self, callback):
|
||||
t = threading.Thread(target=self._run, args=(callback,))
|
||||
t.start()
|
||||
|
||||
def _run(self, callback):
|
||||
|
||||
channel = rabittmq_connection.channel()
|
||||
channel.queue_declare(queue=APPOINTMENT_QUEUE)
|
||||
channel.basic_consume(queue=APPOINTMENT_QUEUE,
|
||||
auto_ack=True,
|
||||
on_message_callback=callback)
|
||||
print(' [*] Waiting for messages. To exit press CTRL+C')
|
||||
channel.start_consuming()
|
||||
@@ -0,0 +1,19 @@
|
||||
import json
|
||||
|
||||
import requests
|
||||
|
||||
|
||||
def generate_names() -> list:
|
||||
res = requests.get(
|
||||
"https://www.namegeneratorfun.com/api/namegenerator?generatorType=list&firstName=&lastName=&minLength=0&maxLength=255&sexId=2&generatorId=43")
|
||||
response_json = json.loads(res.text)
|
||||
names = response_json['names']
|
||||
# print(names)
|
||||
return names
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
for i in range(0, 500):
|
||||
names = generate_names()
|
||||
for name in names:
|
||||
print(name)
|
||||
Reference in New Issue
Block a user