import datetime import logging from pymongo import MongoClient from pojo.ReserveResultPojo import ReserveResultPojo from pojo.contact_pojo import ContactPojo MONGO_DB_URL = "mongo.lpaconsulting.fr" CAPTCHA_ERROR_COLLECTION_PREFIX = "CAPTCHA_ERROR_" class MongoDbManager: def __init__(self): client = MongoClient(MONGO_DB_URL, username='appointment', password='Rdv@2022', authSource='appointment') self.db = client.appointment self.logger = logging.getLogger("mongoDb") def insert_one(self, collection_name: str, dict: dict): collection_to_use = self.db[collection_name] collection_to_use.insert_one(dict) def insert_reserve_result(self, collection_name, reserve: ReserveResultPojo): try: collection_to_use = self.db[collection_name] collection_to_use.replace_one(filter={'_id': reserve.id, }, replacement=reserve.to_firestore_dict(), upsert=True) except Exception as Error: self.logger.info(Error) def insert_captcha_error_contact(self, contact: ContactPojo): day = str(datetime.date.today()) collection_name = CAPTCHA_ERROR_COLLECTION_PREFIX + day try: collection_to_use = self.db[collection_name] collection_to_use.replace_one(filter={'_id': contact.mail, }, replacement=contact.to_firestore_dict(), upsert=True) except Exception as error: self.logger.info(error) def get_captcha_error_contacts_for_current_day(self) -> list: day = str(datetime.date.today()) collection_name = CAPTCHA_ERROR_COLLECTION_PREFIX + day cursor = self.db[collection_name] contact_list = [] for document in cursor.find(): contact_list.append(ContactPojo.from_firestore_dict(document)) return contact_list def delete_captcha_error_contact_for_current_day(self, contact: ContactPojo): day = str(datetime.date.today()) collection_name = CAPTCHA_ERROR_COLLECTION_PREFIX + day collection = self.db[collection_name] to_delete = {'_id': contact.mail} try: collection.delete_one(to_delete) except Exception as error: self.logger.info(error) if __name__ == '__main__': db_manager = MongoDbManager() print(db_manager.get_captcha_error_contacts_for_current_day())