import datetime import logging from pymongo import MongoClient from models.LinkPojo import LinkPojo from models.ReserveResultPojo import ReserveResultPojo from models.contact_pojo import ContactPojo MONGO_DB_URL = "mongo.lpaconsulting.fr" CAPTCHA_ERROR_COLLECTION_PREFIX = "CAPTCHA_ERROR_" BLACK_LIST = "BLACK_LIST" ACCEPTED_APPOINTMENT_LIST = "ACCEPTED_APPOINTMENT_LIST" CONTACT_LIST_TO_BOOK = "CONTACT_LIST_TO_BOOK" EMAIL_LIST = "EMAIL_LIST" DESTINATION_EMAIL_LIST = "DESTINATION_EMAIL_LIST" LINKS_TO_VALIDATE = "LINKS_TO_VALIDATE" INVALID_EMAIL_LIST = "INVALID_EMAIL_LIST" class MongoDbManager: def __init__(self): client = MongoClient(MONGO_DB_URL, username='appointment', password='Rdv@2022', authSource='appointment') self.db = client.appointment self.logger = logging.getLogger("mongoDb") def insert_one(self, collection_name: str, dict: dict): collection_to_use = self.db[collection_name] collection_to_use.insert_one(dict) def insert_reserve_result(self, collection_name, reserve: ReserveResultPojo): try: collection_to_use = self.db[collection_name] collection_to_use.replace_one(filter={'_id': reserve.id, }, replacement=reserve.to_firestore_dict(), upsert=True) except Exception as Error: self.logger.info(Error) def get_all_successful_items_for_day(self) -> list: collection_name = str(datetime.date.today()) result_list = [] cursor = self.db[collection_name] for document in cursor.find(): result_list.append(ReserveResultPojo.from_firestore_dict(document)) return result_list def count_all_successful_items_for_day(self, day: str) -> list: return self.db[day].count_documents({}) def get_all_successful_items_for_yesterday(self) -> list: yesterday = datetime.date.today() - datetime.timedelta(days=1) collection_name = str(yesterday) result_list = [] cursor = self.db[collection_name] for document in cursor.find(): result_list.append(ReserveResultPojo.from_firestore_dict(document)) return result_list def get_all_successful_items_for_one_day(self, day_in_str: str) -> list: collection_name = day_in_str result_list = [] cursor = self.db[collection_name] for document in cursor.find(): result_list.append(ReserveResultPojo.from_firestore_dict(document)) return result_list def get_all_contact_to_book_list(self) -> list: result_list = [] cursor = self.db[CONTACT_LIST_TO_BOOK] for document in cursor.find(): result_list.append(ContactPojo.from_firestore_dict(document)) return result_list def get_links_to_validate(self) -> list: collection_name = LINKS_TO_VALIDATE link_list = [] try: collection_to_use = self.db[collection_name] for document in collection_to_use.find(): link_list.append(LinkPojo.from_firestore_dict(document)) except Exception as error: self.logger.info(error) return link_list MONGO_STORE_MANAGER = MongoDbManager()