diff --git a/queue_message/CookiesPublisher.py b/queue_message/CookiesPublisher.py index 1f4b253..2474df6 100644 --- a/queue_message/CookiesPublisher.py +++ b/queue_message/CookiesPublisher.py @@ -29,7 +29,9 @@ class CookiesPublisher: def publish_body(self, body: str): print("will push to queue {}".format(self.to_queue)) - self.channel.basic_publish(exchange='', routing_key=self.to_queue, body=body) + self.channel.basic_publish(exchange='', routing_key=self.to_queue, body=body, properties=pika.BasicProperties( + delivery_mode=pika.spec.PERSISTENT_DELIVERY_MODE + )) def message_count(self): return self.channel.queue_declare(queue=self.to_queue, durable=True).method.message_count diff --git a/workers/MessagerTransporter.py b/workers/MessagerTransporter.py new file mode 100644 index 0000000..7f4624c --- /dev/null +++ b/workers/MessagerTransporter.py @@ -0,0 +1,41 @@ +import threading + +import pika + +from queue_message.CookiesPublisher import QUEUE_HOST, CookiesPublisher, MORNING_DATA_CACHE + +credentials = pika.PlainCredentials('appointment', 'ZyuhJZ2xEYWhElhpJjy7YEpZGZwNYJz2fHIu') + + +class MessageTransporter(threading.Thread): + + def __init__(self, cookiesPublisher: CookiesPublisher, + queue_to_listen): + self.cookiesPublisher = cookiesPublisher + self.queue_to_listen = queue_to_listen + + def set_up_connection(self): + self.connection = pika.BlockingConnection( + pika.ConnectionParameters(host=QUEUE_HOST, port=5672, credentials=credentials)) + self.channel = self.connection.channel() + + def listen_to_queue(self): + self.channel.basic_qos(prefetch_count=1) + self.channel.basic_consume(queue=self.queue_to_listen, auto_ack=False, on_message_callback=self.on_message) + self.channel.start_consuming() + + def on_message(self, ch, method, properties, body): + print(f" [x] Received {body}") + _received_object = body.decode("UTF-8") + _message_in_queue_count = self.cookiesPublisher.message_count() + print("message count in queue is {}".format(_message_in_queue_count)) + self.cookiesPublisher.publish_body(_received_object) + ch.basic_ack(delivery_tag=method.delivery_tag) + + +if __name__ == '__main__': + cookiesPublisher = CookiesPublisher(queue_name="MORNING_DATA_CACHE_BAK") + cookiesPublisher.set_up_connection() + message_transporter = MessageTransporter(cookiesPublisher=cookiesPublisher, queue_to_listen=MORNING_DATA_CACHE) + message_transporter.set_up_connection() + message_transporter.listen_to_queue() \ No newline at end of file