Files
appointment_request/db/mongo_manager.py

295 lines
10 KiB
Python
Executable File

import datetime
import logging
import time
import os
from typing import Optional
from pymongo import MongoClient
from models.AcceptedResultPojo import AcceptedResultPojo
from models.LinkPojo import LinkPojo
from models.ReserveResultPojo import ReserveResultPojo
from models.contact_pojo import ContactPojo
from models.mail_pojo import MailAddress
from models.regisered_user_pojo import RegisteredUserPojo
MONGO_DB_URL = "mongodb://mongo2.lpaconsulting.fr/?timeoutMS=100000"
CAPTCHA_ERROR_COLLECTION_PREFIX = "CAPTCHA_ERROR_"
BLACK_LIST = "BLACK_LIST"
ACCEPTED_APPOINTMENT_LIST = "ACCEPTED_APPOINTMENT_LIST"
CONTACT_LIST_TO_BOOK = "CONTACT_LIST_TO_BOOK"
EMAIL_LIST = "EMAIL_LIST"
DESTINATION_EMAIL_LIST = "DESTINATION_EMAIL_LIST"
LINKS_TO_VALIDATE = "LINKS_TO_VALIDATE"
INVALID_EMAIL_LIST = "INVALID_EMAIL_LIST"
CONTACT_LIST_SERIAL_MAP = "CONTACT_LIST_SERIAL_MAP"
MAIL_READ_LOG = "MAIL_READ_LOG" # 记录每个邮箱上次读取时间
class MongoDbManager:
def __init__(self):
# Get username and password from environment variables
mongo_username = os.getenv("MONGO_USERNAME")
mongo_password = os.getenv("MONGO_PASSWORD")
# Validate that environment variables exist
if not mongo_username or not mongo_password:
raise ValueError(
"MONGO_USERNAME and MONGO_PASSWORD environment variables must be set"
)
client = MongoClient(
MONGO_DB_URL,
username=mongo_username,
password=mongo_password,
authSource="appointment",
)
self.db = client.appointment
self.logger = logging.getLogger("mongoDb")
def insert_one(self, collection_name: str, dict: dict):
collection_to_use = self.db[collection_name]
collection_to_use.insert_one(dict)
def insert_reserve_result(self, collection_name, reserve: ReserveResultPojo):
try:
collection_to_use = self.db[collection_name]
collection_to_use.replace_one(
filter={
"_id": reserve.id,
},
replacement=reserve.to_firestore_dict(),
upsert=True,
)
except Exception as Error:
self.logger.info(Error)
def get_all_successful_items_for_day(self) -> list:
collection_name = str(datetime.date.today())
result_list = []
cursor = self.db[collection_name]
for document in cursor.find():
result_list.append(ReserveResultPojo.from_firestore_dict(document))
return result_list
def count_all_successful_items_for_day(self, day: str) -> list:
return self.db[day].count_documents({})
def get_all_successful_items_for_yesterday(self) -> list:
yesterday = datetime.date.today() - datetime.timedelta(days=1)
collection_name = str(yesterday)
result_list = []
cursor = self.db[collection_name]
for document in cursor.find():
result_list.append(ReserveResultPojo.from_firestore_dict(document))
return result_list
def get_all_successful_items_for_one_day(self, day_in_str: str) -> list:
collection_name = day_in_str
result_list = []
cursor = self.db[collection_name]
for document in cursor.find():
result_list.append(ReserveResultPojo.from_firestore_dict(document))
return result_list
def get_all_contact_to_book_list(self) -> list:
result_list = []
cursor = self.db[CONTACT_LIST_TO_BOOK]
for document in cursor.find():
result_list.append(ContactPojo.from_firestore_dict(document))
return result_list
def get_all_contact_serial_list(self) -> list:
result_list = []
cursor = self.db[CONTACT_LIST_SERIAL_MAP]
for document in cursor.find():
result_list.append(ContactPojo.from_firestore_dict(document))
return result_list
def save_links_to_validate(
self,
link: str,
mail_address: str,
model: str,
_all_contact_list: list,
_used_ip: str = "",
):
collection_to_use = self.db[LINKS_TO_VALIDATE]
updated_at = time.strftime("%H:%M:%S", time.localtime())
_ip_country = "FR"
serial = "requests"
# find ip_country info
for _contact in _all_contact_list:
if _contact.mail == mail_address:
_ip_country = _contact.ip_country
if len(mail_address) > 0:
collection_to_use.replace_one(
filter={
"_id": mail_address,
},
replacement={
"url": link,
"email": mail_address,
"serial": serial,
"model": model,
"ip_country": _ip_country,
"_used_ip": _used_ip,
"updated_at": updated_at,
},
upsert=True,
)
else:
collection_to_use.replace_one(
filter={
"_id": link,
},
replacement={
"url": link,
"serial": serial,
"model": model,
"ip_country": _ip_country,
"_used_ip": _used_ip,
"updated_at": updated_at,
},
upsert=True,
)
def get_code_for_email(self, email: str):
collection_name = DESTINATION_EMAIL_LIST
try:
collection_to_use = self.db[collection_name]
mailDocument = collection_to_use.find_one(filter={"_id": email})
if mailDocument is not None:
return MailAddress.from_firestore_dict(mailDocument).password
else:
return ""
except Exception as error:
self.logger.info(error)
return ""
def get_all_registered_users(self) -> list:
_collection_name = "Registered_users"
_cursor = self.db[_collection_name]
registered_user_list = []
for document in _cursor.find():
registered_user_list.append(
RegisteredUserPojo.from_firestore_dict(document)
)
return registered_user_list
def get_destination_emails(self) -> list:
collection_name = DESTINATION_EMAIL_LIST
email_list = []
try:
collection_to_use = self.db[collection_name]
for document in collection_to_use.find():
email_list.append(MailAddress.from_firestore_dict(document))
except Exception as error:
self.logger.info(error)
print(error)
return email_list
def insert_accepted_reserve(self, accepted_pojo: AcceptedResultPojo):
try:
collection_to_use = self.db[ACCEPTED_APPOINTMENT_LIST]
collection_to_use.insert_one(accepted_pojo.to_firestore_dict())
except Exception as Error:
self.logger.info(Error)
def get_links_to_validate(self) -> list:
collection_name = LINKS_TO_VALIDATE
link_list = []
try:
collection_to_use = self.db[collection_name]
for document in collection_to_use.find():
link_list.append(LinkPojo.from_firestore_dict(document))
except Exception as error:
self.logger.info(error)
return link_list
def link_validated_for_result(
self,
link: str,
linkPojo: LinkPojo,
state=True,
is_duplicated=False,
is_invalid=False,
segement_position=1,
ua="",
model="",
timestamp_in_s: list = None,
):
if timestamp_in_s is None:
timestamp_in_s = []
print("link_validated_for_result() called with url = " + link)
if is_duplicated:
_id = link.split("/")[-2]
elif is_invalid:
_id = linkPojo.url.split("/")[-2]
else:
_id = link.split("/")[-1]
print("link_validated_for_result() called with id = " + _id)
collection_name = str(datetime.date.today())
print(
"link_validated_for_result() called with collection_name = "
+ collection_name
)
collection = self.db[collection_name]
validated_at = time.strftime("%H:%M:%S", time.localtime())
validated_by = "requests:" + str(segement_position)
if is_invalid:
validated_by = "Invalid"
if is_duplicated:
validated_by = "Double"
collection.find_one_and_update(
{"_id": _id},
{
"$set": {
"url_validated": state,
"validated_at": validated_at,
"id": _id,
"email": linkPojo.email,
"url": link,
"validated_by_model": model,
"serial": linkPojo.serial,
"validated_by_ua": ua,
"timestamp_in_s": "-".join(str(x) for x in timestamp_in_s),
"validated_by": validated_by,
}
},
upsert=True,
)
# remove the link from db
collection_to_use = self.db[LINKS_TO_VALIDATE]
collection_to_use.delete_one({"_id": linkPojo.email})
# ── Mail read-time tracking ────────────────────────────────────
def get_last_mail_read_time(self, mail: str) -> Optional[datetime.datetime]:
"""返回指定邮箱上次被读取的 UTC 时间,若从未读取则返回 None。"""
try:
doc = self.db[MAIL_READ_LOG].find_one({"_id": mail})
if doc and "last_read_at" in doc:
return doc["last_read_at"]
except Exception as err:
self.logger.warning("get_last_mail_read_time error: %s", err)
return None
def update_mail_read_time(self, mail: str) -> None:
"""将指定邮箱的上次读取时间更新为当前 UTC 时间。"""
try:
self.db[MAIL_READ_LOG].replace_one(
{"_id": mail},
{"_id": mail, "last_read_at": datetime.datetime.utcnow()},
upsert=True,
)
except Exception as err:
self.logger.warning("update_mail_read_time error: %s", err)
MONGO_STORE_MANAGER = MongoDbManager()