diff --git a/workers/link_validator_with_provided_list.py b/workers/link_validator_with_provided_list.py index 684bb18..e11b03b 100644 --- a/workers/link_validator_with_provided_list.py +++ b/workers/link_validator_with_provided_list.py @@ -149,44 +149,48 @@ class LinkValidatorWithProvidedList(threading.Thread): _message_in_queue_count = self.cookiesPublisher.message_count() print("message count in queue is {}".format(_message_in_queue_count)) _received_object = body.decode("UTF-8") - _received_dict = json.loads(_received_object) - _received_cookies = _received_dict["cookiesStr"] - self.cookie_str = _received_cookies - _links_to_validate = filter_link_pojo_list_with_serial(_received_dict, self.link_to_validate_list) - random.shuffle(_links_to_validate) - if len(_links_to_validate) > 0 and _message_in_queue_count >= self.limit: - print("{}:links number is {}".format(threading.currentThread().name, len(_links_to_validate))) - can_continue = None - for link_to_validate in _links_to_validate: - print(link_to_validate) - self.proxy_to_use = random.choice( - self.proxy_manager.get_link_validate_proxy(_links_to_validate)) - print("proxy to use is {}".format(self.proxy_to_use)) - can_continue = self.send_request(link_to_validate, _received_dict) - # remove the tested link from link list - self.link_to_validate_list.remove(link_to_validate) - if can_continue == RequestResult.BLOCKED: - print("cannot continue, blocked, then skip") - break - else: - if can_continue == RequestResult.BAD_GATEWAY: - time.sleep(30) + try: + _received_dict = json.loads(_received_object) + _received_cookies = _received_dict["cookiesStr"] + self.cookie_str = _received_cookies + _links_to_validate = filter_link_pojo_list_with_serial(_received_dict, self.link_to_validate_list) + random.shuffle(_links_to_validate) + if len(_links_to_validate) > 0 and _message_in_queue_count >= self.limit: + print("{}:links number is {}".format(threading.currentThread().name, len(_links_to_validate))) + can_continue = None + for link_to_validate in _links_to_validate: + print(link_to_validate) + self.proxy_to_use = random.choice( + self.proxy_manager.get_link_validate_proxy(_links_to_validate)) + print("proxy to use is {}".format(self.proxy_to_use)) + can_continue = self.send_request(link_to_validate, _received_dict) + # remove the tested link from link list + self.link_to_validate_list.remove(link_to_validate) + if can_continue == RequestResult.BLOCKED: + print("cannot continue, blocked, then skip") break - time.sleep(random.randint(2, 5)) - print("can continue, continue") - if can_continue == RequestResult.BAD_GATEWAY or can_continue == RequestResult.PROXY_ERROR or can_continue == RequestResult.SUCCESS: - print("will requeue the message") - ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True) + else: + if can_continue == RequestResult.BAD_GATEWAY: + time.sleep(30) + break + time.sleep(random.randint(2, 5)) + print("can continue, continue") + if can_continue == RequestResult.BAD_GATEWAY or can_continue == RequestResult.PROXY_ERROR or can_continue == RequestResult.SUCCESS: + print("will requeue the message") + ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True) + else: + print("will ack") + ch.basic_ack(delivery_tag=method.delivery_tag) else: - print("will ack") + print("empty list, will republish message") + time.sleep(0) + print("body in json:{}".format(json.dumps(_received_dict))) + self.cookiesPublisher.publish_body(json.dumps(_received_dict)) ch.basic_ack(delivery_tag=method.delivery_tag) - else: - print("empty list, will republish message") - time.sleep(0) - print("body in json:{}".format(json.dumps(_received_dict))) - self.cookiesPublisher.publish_body(json.dumps(_received_dict)) + self.channel.stop_consuming() + except: + print("not json format will ack") ch.basic_ack(delivery_tag=method.delivery_tag) - self.channel.stop_consuming() def filter_with_ip_country(self): _link_list_to_click = []