pb with data in the queue

This commit is contained in:
2025-06-30 17:49:52 +02:00
parent 3a2983a932
commit 62a7cc020d
+38 -34
View File
@@ -149,44 +149,48 @@ class LinkValidatorWithProvidedList(threading.Thread):
_message_in_queue_count = self.cookiesPublisher.message_count() _message_in_queue_count = self.cookiesPublisher.message_count()
print("message count in queue is {}".format(_message_in_queue_count)) print("message count in queue is {}".format(_message_in_queue_count))
_received_object = body.decode("UTF-8") _received_object = body.decode("UTF-8")
_received_dict = json.loads(_received_object) try:
_received_cookies = _received_dict["cookiesStr"] _received_dict = json.loads(_received_object)
self.cookie_str = _received_cookies _received_cookies = _received_dict["cookiesStr"]
_links_to_validate = filter_link_pojo_list_with_serial(_received_dict, self.link_to_validate_list) self.cookie_str = _received_cookies
random.shuffle(_links_to_validate) _links_to_validate = filter_link_pojo_list_with_serial(_received_dict, self.link_to_validate_list)
if len(_links_to_validate) > 0 and _message_in_queue_count >= self.limit: random.shuffle(_links_to_validate)
print("{}:links number is {}".format(threading.currentThread().name, len(_links_to_validate))) if len(_links_to_validate) > 0 and _message_in_queue_count >= self.limit:
can_continue = None print("{}:links number is {}".format(threading.currentThread().name, len(_links_to_validate)))
for link_to_validate in _links_to_validate: can_continue = None
print(link_to_validate) for link_to_validate in _links_to_validate:
self.proxy_to_use = random.choice( print(link_to_validate)
self.proxy_manager.get_link_validate_proxy(_links_to_validate)) self.proxy_to_use = random.choice(
print("proxy to use is {}".format(self.proxy_to_use)) self.proxy_manager.get_link_validate_proxy(_links_to_validate))
can_continue = self.send_request(link_to_validate, _received_dict) print("proxy to use is {}".format(self.proxy_to_use))
# remove the tested link from link list can_continue = self.send_request(link_to_validate, _received_dict)
self.link_to_validate_list.remove(link_to_validate) # remove the tested link from link list
if can_continue == RequestResult.BLOCKED: self.link_to_validate_list.remove(link_to_validate)
print("cannot continue, blocked, then skip") if can_continue == RequestResult.BLOCKED:
break print("cannot continue, blocked, then skip")
else:
if can_continue == RequestResult.BAD_GATEWAY:
time.sleep(30)
break break
time.sleep(random.randint(2, 5)) else:
print("can continue, continue") if can_continue == RequestResult.BAD_GATEWAY:
if can_continue == RequestResult.BAD_GATEWAY or can_continue == RequestResult.PROXY_ERROR or can_continue == RequestResult.SUCCESS: time.sleep(30)
print("will requeue the message") break
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True) time.sleep(random.randint(2, 5))
print("can continue, continue")
if can_continue == RequestResult.BAD_GATEWAY or can_continue == RequestResult.PROXY_ERROR or can_continue == RequestResult.SUCCESS:
print("will requeue the message")
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
else:
print("will ack")
ch.basic_ack(delivery_tag=method.delivery_tag)
else: else:
print("will ack") print("empty list, will republish message")
time.sleep(0)
print("body in json:{}".format(json.dumps(_received_dict)))
self.cookiesPublisher.publish_body(json.dumps(_received_dict))
ch.basic_ack(delivery_tag=method.delivery_tag) ch.basic_ack(delivery_tag=method.delivery_tag)
else: self.channel.stop_consuming()
print("empty list, will republish message") except:
time.sleep(0) print("not json format will ack")
print("body in json:{}".format(json.dumps(_received_dict)))
self.cookiesPublisher.publish_body(json.dumps(_received_dict))
ch.basic_ack(delivery_tag=method.delivery_tag) ch.basic_ack(delivery_tag=method.delivery_tag)
self.channel.stop_consuming()
def filter_with_ip_country(self): def filter_with_ip_country(self):
_link_list_to_click = [] _link_list_to_click = []