add method to get message count
This commit is contained in:
@@ -10,12 +10,17 @@ class CookiesPublisher:
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.connection = None
|
self.connection = None
|
||||||
self.channel = None
|
self.channel = None
|
||||||
|
self.queue_method = None
|
||||||
|
|
||||||
def set_up_connection(self):
|
def set_up_connection(self):
|
||||||
self.connection = pika.BlockingConnection(
|
self.connection = pika.BlockingConnection(
|
||||||
pika.ConnectionParameters(host=QUEUE_HOST, port=5672, credentials=credentials))
|
pika.ConnectionParameters(host=QUEUE_HOST, port=5672, credentials=credentials))
|
||||||
self.channel = self.connection.channel()
|
self.channel = self.connection.channel()
|
||||||
# self.channel.queue_declare(queue=REQUEST_DATA_QUEUE, durable=True)
|
self.queue_method = self.channel.queue_declare(queue=REQUEST_DATA_QUEUE, durable=True)
|
||||||
|
|
||||||
def publish_body(self, body: str):
|
def publish_body(self, body: str):
|
||||||
self.channel.basic_publish(exchange='', routing_key=REQUEST_DATA_QUEUE, body=body)
|
self.channel.basic_publish(exchange='', routing_key=REQUEST_DATA_QUEUE, body=body)
|
||||||
|
|
||||||
|
def message_count(self):
|
||||||
|
return self.channel.queue_declare(queue=REQUEST_DATA_QUEUE, durable=True).method.message_count
|
||||||
|
# return self.queue_method.method.message_count
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import datetime
|
||||||
import random
|
import random
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
@@ -8,6 +9,7 @@ import pika
|
|||||||
from db.mongo_manager import MONGO_STORE_MANAGER
|
from db.mongo_manager import MONGO_STORE_MANAGER
|
||||||
from models.contact_pojo import ContactPojo
|
from models.contact_pojo import ContactPojo
|
||||||
from queue_message.CookiesPublisher import CookiesPublisher
|
from queue_message.CookiesPublisher import CookiesPublisher
|
||||||
|
from utiles import is_time_between
|
||||||
from workers.captcha_result_getter import CaptchaResultGetter, HERMES_REGISTER
|
from workers.captcha_result_getter import CaptchaResultGetter, HERMES_REGISTER
|
||||||
from workers.sender import Sender
|
from workers.sender import Sender
|
||||||
|
|
||||||
@@ -66,6 +68,10 @@ def get_valid_csrf() -> str:
|
|||||||
return new_csrf
|
return new_csrf
|
||||||
|
|
||||||
|
|
||||||
|
def is_open():
|
||||||
|
return is_time_between(datetime.time(10, 30), datetime.time(19, 00))
|
||||||
|
|
||||||
|
|
||||||
class Receiver(threading.Thread):
|
class Receiver(threading.Thread):
|
||||||
def __init__(self, sub_contact_list: list, cookiesPublisher: CookiesPublisher):
|
def __init__(self, sub_contact_list: list, cookiesPublisher: CookiesPublisher):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
@@ -87,11 +93,12 @@ class Receiver(threading.Thread):
|
|||||||
|
|
||||||
def on_message(self, ch, method, properties, body):
|
def on_message(self, ch, method, properties, body):
|
||||||
print(f" [x] Received {body}")
|
print(f" [x] Received {body}")
|
||||||
|
print("message count in queue is {}".format(self.cookiesPublisher.message_count()))
|
||||||
sender = Sender(body.decode("UTF-8"), cookiesPublisher=self.cookiesPublisher)
|
sender = Sender(body.decode("UTF-8"), cookiesPublisher=self.cookiesPublisher)
|
||||||
self.contact_list = filter_contacts(self.contact_list)
|
self.contact_list = filter_contacts(self.contact_list)
|
||||||
# remove already booked contacts
|
# remove already booked contacts
|
||||||
random.shuffle(self.contact_list)
|
random.shuffle(self.contact_list)
|
||||||
if len(self.contact_list) > 0:
|
if len(self.contact_list) > 0 and is_open():
|
||||||
captchaResultGetter = CaptchaResultGetter()
|
captchaResultGetter = CaptchaResultGetter()
|
||||||
print("contact number is {}".format(len(self.contact_list)))
|
print("contact number is {}".format(len(self.contact_list)))
|
||||||
self.contact_list = filter_contacts(self.contact_list)
|
self.contact_list = filter_contacts(self.contact_list)
|
||||||
@@ -122,7 +129,3 @@ class Receiver(threading.Thread):
|
|||||||
self.set_up_connection()
|
self.set_up_connection()
|
||||||
self.listen_to_queue(self.on_message)
|
self.listen_to_queue(self.on_message)
|
||||||
self.channel.start_consuming()
|
self.channel.start_consuming()
|
||||||
# if __name__ == '__main__':
|
|
||||||
# receiver = Receiver()
|
|
||||||
# receiver.set_up_connection()
|
|
||||||
# receiver.listen_to_queue(on_message)
|
|
||||||
|
|||||||
Reference in New Issue
Block a user