first commit
This commit is contained in:
Binary file not shown.
Binary file not shown.
Executable
+74
@@ -0,0 +1,74 @@
|
||||
import datetime
|
||||
import logging
|
||||
|
||||
from pymongo import MongoClient
|
||||
|
||||
from models.ReserveResultPojo import ReserveResultPojo
|
||||
from models.contact_pojo import ContactPojo
|
||||
|
||||
MONGO_DB_URL = "mongo.lpaconsulting.fr"
|
||||
CAPTCHA_ERROR_COLLECTION_PREFIX = "CAPTCHA_ERROR_"
|
||||
BLACK_LIST = "BLACK_LIST"
|
||||
ACCEPTED_APPOINTMENT_LIST = "ACCEPTED_APPOINTMENT_LIST"
|
||||
CONTACT_LIST_TO_BOOK = "CONTACT_LIST_TO_BOOK"
|
||||
EMAIL_LIST = "EMAIL_LIST"
|
||||
DESTINATION_EMAIL_LIST = "DESTINATION_EMAIL_LIST"
|
||||
LINKS_TO_VALIDATE = "LINKS_TO_VALIDATE"
|
||||
INVALID_EMAIL_LIST = "INVALID_EMAIL_LIST"
|
||||
|
||||
|
||||
class MongoDbManager:
|
||||
def __init__(self):
|
||||
client = MongoClient(MONGO_DB_URL, username='appointment', password='Rdv@2022', authSource='appointment')
|
||||
self.db = client.appointment
|
||||
self.logger = logging.getLogger("mongoDb")
|
||||
|
||||
def insert_one(self, collection_name: str, dict: dict):
|
||||
collection_to_use = self.db[collection_name]
|
||||
collection_to_use.insert_one(dict)
|
||||
|
||||
def insert_reserve_result(self, collection_name, reserve: ReserveResultPojo):
|
||||
try:
|
||||
collection_to_use = self.db[collection_name]
|
||||
collection_to_use.replace_one(filter={'_id': reserve.id, }, replacement=reserve.to_firestore_dict(),
|
||||
upsert=True)
|
||||
except Exception as Error:
|
||||
self.logger.info(Error)
|
||||
|
||||
def get_all_successful_items_for_day(self) -> list:
|
||||
collection_name = str(datetime.date.today())
|
||||
result_list = []
|
||||
cursor = self.db[collection_name]
|
||||
for document in cursor.find():
|
||||
result_list.append(ReserveResultPojo.from_firestore_dict(document))
|
||||
return result_list
|
||||
|
||||
def count_all_successful_items_for_day(self, day: str) -> list:
|
||||
return self.db[day].count_documents({})
|
||||
|
||||
def get_all_successful_items_for_yesterday(self) -> list:
|
||||
yesterday = datetime.date.today() - datetime.timedelta(days=1)
|
||||
collection_name = str(yesterday)
|
||||
result_list = []
|
||||
cursor = self.db[collection_name]
|
||||
for document in cursor.find():
|
||||
result_list.append(ReserveResultPojo.from_firestore_dict(document))
|
||||
return result_list
|
||||
|
||||
def get_all_successful_items_for_one_day(self, day_in_str: str) -> list:
|
||||
collection_name = day_in_str
|
||||
result_list = []
|
||||
cursor = self.db[collection_name]
|
||||
for document in cursor.find():
|
||||
result_list.append(ReserveResultPojo.from_firestore_dict(document))
|
||||
return result_list
|
||||
|
||||
def get_all_contact_to_book_list(self) -> list:
|
||||
result_list = []
|
||||
cursor = self.db[CONTACT_LIST_TO_BOOK]
|
||||
for document in cursor.find():
|
||||
result_list.append(ContactPojo.from_firestore_dict(document))
|
||||
return result_list
|
||||
|
||||
|
||||
MONGO_STORE_MANAGER = MongoDbManager()
|
||||
Reference in New Issue
Block a user