valid the links with paris ip

This commit is contained in:
2024-12-14 11:51:05 +01:00
parent 4b17fa27c8
commit bda446f2d9
7 changed files with 134 additions and 53 deletions
+46 -48
View File
@@ -13,11 +13,10 @@ from db.mongo_manager import MONGO_STORE_MANAGER
from models.LinkPojo import LinkPojo
from models.result_pojo import RequestResult
from proxy_manager.proxy_manager import ProxyManager
from queue_message.CookiesPublisher import CookiesPublisher, REQUEST_DATA_QUEUE_TEST, TEST_QUEUE, SHARED_OBJECT, \
MORNING_DATA_CACHE_BAK, MORNING_DATA_CACHE, MORNING_DATA_CACHE_2
from queue_message.CookiesPublisher import CookiesPublisher, MORNING_DATA_CACHE
from queue_message.appointmentrequestsender import QUEUE_HOST, REQUEST_DATA_QUEUE, credentials
from utils.AppLogging import init_logger
from workers.proxies_constants import PROXY_LIST_FR
from utils.user_agent_helper import generate_headers_from_request_message
DOUBLE_MESSAGE = "Une demande de rendez-vous a déjà été enregistrée avec ces coordonnées"
INVALID = "Depuis plus de 130 ans,"
@@ -26,13 +25,21 @@ init_logger()
logger = logging.getLogger()
def filter_link_pojo_list_with_serial(_received_dict, link_to_validate_list):
_serial = _received_dict["serial"]
_model = _received_dict["model"]
_to_return = filter(lambda link_pojo: link_pojo.serial == _serial, link_to_validate_list)
return list(_to_return)
class LinkValidatorWithProvidedList(threading.Thread):
def __init__(self, cookiesPublisher: CookiesPublisher, link_list: list,
def __init__(self, cookiesPublisher: CookiesPublisher, link_list: list, _contact_serial_list,
queue_to_listen=REQUEST_DATA_QUEUE,
ip_country="FR",
limit=40):
super().__init__()
self.contact_serial_list = _contact_serial_list
self.link_to_validate_list = link_list
self.cookie = SimpleCookie()
self.cookiesPublisher = cookiesPublisher
@@ -57,25 +64,9 @@ class LinkValidatorWithProvidedList(threading.Thread):
if _received_dict is not None:
_ua = _received_dict['ua']
_model = _received_dict['model']
_ua = "Mozilla/5.0 (Linux; Android 9; {}) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.58 Mobile Safari/537.36".format(
_model)
logger.info("model: %s", _model)
self.cookie.load(self.cookie_str)
headers = {
'User-Agent': _ua,
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate, br',
'Cache-Control': 'max-age=0',
'Referer': linkPojo.url,
'Cookie': self.cookie_str,
'Sec-Fetch-Mode': 'navigate',
'Host': 'rendezvousparis.hermes.com',
'Sec-Fetch-Site': 'same-origin',
'sec-ch-ua': '"Brave";v="125", "Chromium";v="125", "Not.A/Brand";v="24"',
'sec-ch-ua-platform': '"Android"',
'sec-ch-ua-model': '""',
'Sec-Fetch-Dest': 'document',
'Accept-Language': 'fr-FR,fr;q=0.6'}
headers = generate_headers_from_request_message(_received_dict, self.cookie_str)
print("received cookie is " + str(self.cookie_str))
print("send request for link: " + linkPojo.url)
try:
@@ -110,7 +101,6 @@ class LinkValidatorWithProvidedList(threading.Thread):
self.cookie_str = new_coolies_str
return RequestResult.SUCCESS
elif INVALID in _content:
MONGO_STORE_MANAGER.link_validated_for_result(response.url, linkPojo, is_invalid=True, ua=_ua)
# set new cookies
_cookies_to_set = response.headers['set-cookie']
@@ -158,14 +148,15 @@ class LinkValidatorWithProvidedList(threading.Thread):
_received_dict = json.loads(_received_object)
_received_cookies = _received_dict["cookiesStr"]
self.cookie_str = _received_cookies
random.shuffle(self.link_to_validate_list)
if len(self.link_to_validate_list) > 0 and _message_in_queue_count >= self.limit:
print("{}:links number is {}".format(threading.currentThread().name, len(self.link_to_validate_list)))
_links_to_validate = filter_link_pojo_list_with_serial(_received_dict, self.link_to_validate_list)
random.shuffle(_links_to_validate)
if len(_links_to_validate) > 0 and _message_in_queue_count >= self.limit:
print("{}:links number is {}".format(threading.currentThread().name, len(_links_to_validate)))
can_continue = None
for link_to_validate in self.link_to_validate_list:
for link_to_validate in _links_to_validate:
print(link_to_validate)
self.proxy_to_use = random.choice(
self.proxy_manager.get_link_validate_proxy(self.link_to_validate_list))
self.proxy_manager.get_link_validate_proxy(_links_to_validate))
print("proxy to use is {}".format(self.proxy_to_use))
can_continue = self.send_request(link_to_validate, _received_dict)
# remove the tested link from link list
@@ -186,9 +177,11 @@ class LinkValidatorWithProvidedList(threading.Thread):
print("will ack")
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
print("empty list, no need to ack")
time.sleep(5)
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
print("empty list, will republish message")
time.sleep(0)
print("body in json:{}".format(json.dumps(_received_dict)))
self.cookiesPublisher.publish_body(json.dumps(_received_dict))
ch.basic_ack(delivery_tag=method.delivery_tag)
self.channel.stop_consuming()
def filter_with_ip_country(self):
@@ -199,9 +192,9 @@ class LinkValidatorWithProvidedList(threading.Thread):
self.link_to_validate_list = _link_list_to_click
def validate_links(cookiesPublisher, queue_name: str, link_list: list):
def validate_links(cookiesPublisher, queue_name: str, link_list: list, _contact_serial_list):
receiver = LinkValidatorWithProvidedList(cookiesPublisher=cookiesPublisher, link_list=link_list,
queue_to_listen=queue_name, ip_country="FR", limit=0)
queue_to_listen=queue_name, _contact_serial_list= _contact_serial_list, ip_country="FR", limit=0)
print("{} set_up_connection".format(threading.currentThread().name))
receiver.set_up_connection()
receiver.listen_to_queue(receiver.on_message)
@@ -209,26 +202,28 @@ def validate_links(cookiesPublisher, queue_name: str, link_list: list):
# default_segment_number 并发数,决定速度
# divided = 4,越小,一次处理得越多
def validate_all_links():
def validate_all_links(_contact_serial_list):
print("will get all links")
all_link_list = MONGO_STORE_MANAGER.get_links_to_validate()
# get the first 50 links
if len(all_link_list) == 0:
return
divided = 3
default_segment_number = 20
_first_25_percent_links = all_link_list[0:(int(len(all_link_list) / divided))]
# divided = 1
# default_segment_number = 20
# _first_25_percent_links = all_link_list[0:(int(len(all_link_list) / divided))]
_first_25_percent_links = all_link_list
_queue_name = MORNING_DATA_CACHE
# _queue_name = MORNING_DATA_CACHE_BAK
if len(all_link_list) > divided * default_segment_number:
_segment_number = default_segment_number
else:
_first_25_percent_links = all_link_list
if len(_first_25_percent_links) > divided:
_segment_number = int(len(_first_25_percent_links) / divided)
else:
_segment_number = 1
# # _queue_name = MORNING_DATA_CACHE_BAK
# if len(all_link_list) > divided * default_segment_number:
# _segment_number = default_segment_number
# else:
# _first_25_percent_links = all_link_list
# if len(_first_25_percent_links) > divided:
# _segment_number = int(len(_first_25_percent_links) / divided)
# else:
# _segment_number = 1
_thread_list = []
_segment_number =1
for i in range(0, _segment_number):
logger.info("{}:{} links to validate".format(threading.currentThread().name, len(_first_25_percent_links)))
logger.info("segment is {}".format(i))
@@ -236,7 +231,8 @@ def validate_all_links():
_cookiesPublisher.set_up_connection()
_step = int(len(_first_25_percent_links) / _segment_number)
_sublist = _first_25_percent_links[i * _step:_step * (i + 1)]
_thread1 = threading.Thread(target=validate_links, args=(_cookiesPublisher, _queue_name, _sublist))
_thread1 = threading.Thread(target=validate_links,
args=(_cookiesPublisher, _queue_name, _sublist, _contact_serial_list))
_thread_list.append(_thread1)
_thread1.start()
for _thread in _thread_list:
@@ -245,10 +241,12 @@ def validate_all_links():
if __name__ == '__main__':
# generate test data
# contact_serial_map_list = MONGO_STORE_MANAGER.get_all_contact_serial_list()
while True:
print("call validate_all_links()")
validate_all_links()
delay = random.randint(10, 30)
validate_all_links([])
# delay = random.randint(10, 30)
delay = random.randint(1, 10)
current_time = datetime.now()
current_hour = current_time.hour
print("Current hour ", current_time.hour)