can use multi thread to validate links
This commit is contained in:
+1
-1
@@ -137,7 +137,7 @@ class MongoDbManager:
|
|||||||
|
|
||||||
collection = self.db[collection_name]
|
collection = self.db[collection_name]
|
||||||
validated_at = time.strftime("%H:%M:%S", time.localtime())
|
validated_at = time.strftime("%H:%M:%S", time.localtime())
|
||||||
validated_by = "requests" + str(segement_position)
|
validated_by = "requests:" + str(segement_position)
|
||||||
if is_invalid:
|
if is_invalid:
|
||||||
validated_by = "Invalid"
|
validated_by = "Invalid"
|
||||||
if is_duplicated:
|
if is_duplicated:
|
||||||
|
|||||||
@@ -37,4 +37,3 @@ class CookiesPublisher:
|
|||||||
|
|
||||||
def message_count(self):
|
def message_count(self):
|
||||||
return self.channel.queue_declare(queue=self.to_queue, durable=True).method.message_count
|
return self.channel.queue_declare(queue=self.to_queue, durable=True).method.message_count
|
||||||
# return self.queue_method.method.message_count
|
|
||||||
|
|||||||
+2
-2
@@ -5,14 +5,14 @@ from request_sender_test import start_send_requests
|
|||||||
|
|
||||||
|
|
||||||
def start_book_appointment():
|
def start_book_appointment():
|
||||||
start_send_requests(thread_number=25, file_path='~/Desktop/contact_list_2024-04-25_2.xlsx')
|
start_send_requests(thread_number=25, file_path='~/Desktop/contact_list_2024-05-04.xlsx')
|
||||||
|
|
||||||
|
|
||||||
def start_check_results_job(sched):
|
def start_check_results_job(sched):
|
||||||
sched.add_job(start_book_appointment, 'cron', day_of_week='mon-sat', hour='10',
|
sched.add_job(start_book_appointment, 'cron', day_of_week='mon-sat', hour='10',
|
||||||
minute='30',
|
minute='30',
|
||||||
misfire_grace_time=10,
|
misfire_grace_time=10,
|
||||||
second='0', timezone='Europe/Paris', max_instances=1, args=[])
|
second='20', timezone='Europe/Paris', max_instances=1, args=[])
|
||||||
|
|
||||||
|
|
||||||
def config_and_start_jobs():
|
def config_and_start_jobs():
|
||||||
|
|||||||
@@ -0,0 +1,224 @@
|
|||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import random
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from http.cookies import SimpleCookie
|
||||||
|
|
||||||
|
import pika
|
||||||
|
import requests
|
||||||
|
|
||||||
|
from db.mongo_manager import MONGO_STORE_MANAGER
|
||||||
|
from models.LinkPojo import LinkPojo
|
||||||
|
from models.result_pojo import RequestResult
|
||||||
|
from proxy_manager.proxy_manager import ProxyManager
|
||||||
|
from queue_message.CookiesPublisher import CookiesPublisher, REQUEST_DATA_QUEUE_TEST, TEST_QUEUE, SHARED_OBJECT
|
||||||
|
from queue_message.appointmentrequestsender import QUEUE_HOST, REQUEST_DATA_QUEUE, credentials
|
||||||
|
from utils.AppLogging import init_logger
|
||||||
|
from workers.proxies_constants import PROXY_LIST_FR
|
||||||
|
|
||||||
|
DOUBLE_MESSAGE = "Une demande de rendez-vous a déjà été enregistrée avec ces coordonnées"
|
||||||
|
INVALID = "Depuis plus de 130 ans,"
|
||||||
|
|
||||||
|
init_logger()
|
||||||
|
logger = logging.getLogger()
|
||||||
|
|
||||||
|
|
||||||
|
class LinkValidatorWithProvidedList(threading.Thread):
|
||||||
|
|
||||||
|
def __init__(self, cookiesPublisher: CookiesPublisher, link_list: list,
|
||||||
|
queue_to_listen=REQUEST_DATA_QUEUE,
|
||||||
|
ip_country="FR",
|
||||||
|
limit=40):
|
||||||
|
super().__init__()
|
||||||
|
self.link_to_validate_list = link_list
|
||||||
|
self.cookie = SimpleCookie()
|
||||||
|
self.cookiesPublisher = cookiesPublisher
|
||||||
|
self.queue_to_listen = queue_to_listen
|
||||||
|
self.ip_country = ip_country
|
||||||
|
self.filter_with_ip_country()
|
||||||
|
self.proxy_manager = ProxyManager()
|
||||||
|
self.limit = limit
|
||||||
|
|
||||||
|
def set_up_connection(self):
|
||||||
|
self.connection = pika.BlockingConnection(
|
||||||
|
pika.ConnectionParameters(host=QUEUE_HOST, port=5672, credentials=credentials))
|
||||||
|
self.channel = self.connection.channel()
|
||||||
|
|
||||||
|
def listen_to_queue(self, callback):
|
||||||
|
self.channel.basic_qos(prefetch_count=1)
|
||||||
|
self.channel.basic_consume(queue=self.queue_to_listen, auto_ack=False, on_message_callback=callback)
|
||||||
|
self.channel.start_consuming()
|
||||||
|
|
||||||
|
def send_request(self, linkPojo: LinkPojo, _received_dict=None) -> RequestResult:
|
||||||
|
_ua = 'Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Mobile Safari/537.36'
|
||||||
|
if _received_dict is not None:
|
||||||
|
_ua = _received_dict['ua']
|
||||||
|
self.cookie.load(self.cookie_str)
|
||||||
|
headers = {
|
||||||
|
'User-Agent': _ua,
|
||||||
|
'Accept': '*/*',
|
||||||
|
'Accept-Encoding': 'gzip, deflate, br',
|
||||||
|
'Cache-Control': 'max-age=0',
|
||||||
|
'Referer': linkPojo.url,
|
||||||
|
'Cookie': self.cookie_str,
|
||||||
|
'Sec-Fetch-Mode': 'navigate',
|
||||||
|
'Host': 'rendezvousparis.hermes.com',
|
||||||
|
'Sec-Fetch-Site': 'same-origin',
|
||||||
|
'Sec-Fetch-Dest': 'document',
|
||||||
|
'Accept-Language': 'fr-FR,fr;q=0.6'}
|
||||||
|
_proxy_to_use = random.choice(self.proxy_manager.get_link_validate_proxy(self.link_to_validate_list))
|
||||||
|
print(_proxy_to_use)
|
||||||
|
print("received cookie is " + str(self.cookie_str))
|
||||||
|
print("send request for link: " + linkPojo.url)
|
||||||
|
try:
|
||||||
|
print("will send request with ua {}".format(_ua))
|
||||||
|
print("will send request with cookie {}".format(self.cookie_str))
|
||||||
|
response = requests.get(url=linkPojo.url, headers=headers, verify=False, proxies=self.proxy_to_use,
|
||||||
|
timeout=60)
|
||||||
|
print(response.status_code)
|
||||||
|
if response.status_code == 200:
|
||||||
|
_content = response.text
|
||||||
|
print(response.text)
|
||||||
|
if "Votre demande de rendez-vous Maroquinerie a bien été enregistrée" in _content:
|
||||||
|
print(response.url)
|
||||||
|
MONGO_STORE_MANAGER.link_validated_for_result(response.url, linkPojo,
|
||||||
|
segement_position=threading.currentThread().name)
|
||||||
|
# set new cookies
|
||||||
|
_cookies_to_set = response.headers['set-cookie']
|
||||||
|
self.cookie.load(_cookies_to_set)
|
||||||
|
new_cookies = {k: v.value for k, v in self.cookie.items()}
|
||||||
|
new_coolies_str = ""
|
||||||
|
for key in new_cookies:
|
||||||
|
new_coolies_str = new_coolies_str + key + "=" + new_cookies[key] + ";"
|
||||||
|
print("will publish to queue {}".format(new_coolies_str))
|
||||||
|
# upload the cookie to queue
|
||||||
|
if _received_dict is not None:
|
||||||
|
_received_dict['cookiesStr'] = new_coolies_str
|
||||||
|
print("body in json:{}".format(json.dumps(_received_dict)))
|
||||||
|
self.cookiesPublisher.publish_body(json.dumps(_received_dict))
|
||||||
|
else:
|
||||||
|
self.cookiesPublisher.publish_body(new_coolies_str)
|
||||||
|
self.cookie_str = new_coolies_str
|
||||||
|
return RequestResult.SUCCESS
|
||||||
|
elif INVALID in _content:
|
||||||
|
|
||||||
|
MONGO_STORE_MANAGER.link_validated_for_result(response.url, linkPojo, is_invalid=True)
|
||||||
|
# set new cookies
|
||||||
|
_cookies_to_set = response.headers['set-cookie']
|
||||||
|
self.cookie.load(_cookies_to_set)
|
||||||
|
new_cookies = {k: v.value for k, v in self.cookie.items()}
|
||||||
|
new_coolies_str = ""
|
||||||
|
for key in new_cookies:
|
||||||
|
new_coolies_str = new_coolies_str + key + "=" + new_cookies[key] + ";"
|
||||||
|
print("will publish to queue {}".format(new_coolies_str))
|
||||||
|
# upload the cookie to queue
|
||||||
|
self.cookiesPublisher.publish_body(new_coolies_str)
|
||||||
|
self.cookie_str = new_coolies_str
|
||||||
|
return RequestResult.SUCCESS
|
||||||
|
|
||||||
|
elif DOUBLE_MESSAGE in _content:
|
||||||
|
print(response.url)
|
||||||
|
MONGO_STORE_MANAGER.link_validated_for_result(response.url, linkPojo, is_duplicated=True)
|
||||||
|
# set new cookies
|
||||||
|
_cookies_to_set = response.headers['set-cookie']
|
||||||
|
self.cookie.load(_cookies_to_set)
|
||||||
|
new_cookies = {k: v.value for k, v in self.cookie.items()}
|
||||||
|
new_coolies_str = ""
|
||||||
|
for key in new_cookies:
|
||||||
|
new_coolies_str = new_coolies_str + key + "=" + new_cookies[key] + ";"
|
||||||
|
print("will publish to queue {}".format(new_coolies_str))
|
||||||
|
# upload the cookie to queue
|
||||||
|
self.cookiesPublisher.publish_body(new_coolies_str)
|
||||||
|
self.cookie_str = new_coolies_str
|
||||||
|
return RequestResult.SUCCESS
|
||||||
|
else:
|
||||||
|
return RequestResult.UNKNOWN
|
||||||
|
elif response.status_code == 502:
|
||||||
|
return RequestResult.BAD_GATEWAY
|
||||||
|
else:
|
||||||
|
return RequestResult.BLOCKED
|
||||||
|
except Exception as error:
|
||||||
|
print(error)
|
||||||
|
return RequestResult.PROXY_ERROR
|
||||||
|
|
||||||
|
def on_message(self, ch, method, properties, body):
|
||||||
|
print(f" [x] Received {body}")
|
||||||
|
_message_in_queue_count = self.cookiesPublisher.message_count()
|
||||||
|
print("message count in queue is {}".format(_message_in_queue_count))
|
||||||
|
_received_object = body.decode("UTF-8")
|
||||||
|
_received_dict = None
|
||||||
|
if "glrd" in _received_object:
|
||||||
|
_received_dict = json.loads(_received_object)
|
||||||
|
_received_cookies = _received_dict["cookiesStr"]
|
||||||
|
else:
|
||||||
|
_received_cookies = _received_object
|
||||||
|
self.cookie_str = _received_cookies
|
||||||
|
random.shuffle(self.link_to_validate_list)
|
||||||
|
if len(self.link_to_validate_list) > 0 and _message_in_queue_count >= self.limit:
|
||||||
|
print("{}:links number is {}".format(threading.currentThread().name, len(self.link_to_validate_list)))
|
||||||
|
can_continue = None
|
||||||
|
for link_to_validate in self.link_to_validate_list:
|
||||||
|
print(link_to_validate)
|
||||||
|
self.proxy_to_use = random.choice(PROXY_LIST_FR)
|
||||||
|
can_continue = self.send_request(link_to_validate, _received_dict)
|
||||||
|
# remove the tested link from link list
|
||||||
|
self.link_to_validate_list.remove(link_to_validate)
|
||||||
|
if can_continue == RequestResult.BLOCKED:
|
||||||
|
print("cannot continue, blocked, then skip")
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
if can_continue == RequestResult.BAD_GATEWAY:
|
||||||
|
time.sleep(30)
|
||||||
|
break
|
||||||
|
time.sleep(random.randint(2, 5))
|
||||||
|
print("can continue, continue")
|
||||||
|
if can_continue == RequestResult.BAD_GATEWAY or can_continue == RequestResult.PROXY_ERROR or can_continue == RequestResult.SUCCESS:
|
||||||
|
print("will requeue the message")
|
||||||
|
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
|
||||||
|
else:
|
||||||
|
print("will ack")
|
||||||
|
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||||
|
else:
|
||||||
|
print("empty list, no need to ack")
|
||||||
|
time.sleep(5)
|
||||||
|
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
|
||||||
|
self.channel.stop_consuming()
|
||||||
|
|
||||||
|
def filter_with_ip_country(self):
|
||||||
|
_link_list_to_click = []
|
||||||
|
for _link in self.link_to_validate_list:
|
||||||
|
if _link.ip_country == self.ip_country:
|
||||||
|
_link_list_to_click.append(_link)
|
||||||
|
self.link_to_validate_list = _link_list_to_click
|
||||||
|
|
||||||
|
|
||||||
|
def validate_links(cookiesPublisher, queue_name: str, link_list: list):
|
||||||
|
receiver = LinkValidatorWithProvidedList(cookiesPublisher=cookiesPublisher, link_list=link_list,
|
||||||
|
queue_to_listen=queue_name, ip_country="FR", limit=0)
|
||||||
|
print("{} set_up_connection".format(threading.currentThread().name))
|
||||||
|
receiver.set_up_connection()
|
||||||
|
receiver.listen_to_queue(receiver.on_message)
|
||||||
|
|
||||||
|
|
||||||
|
def validate_all_links():
|
||||||
|
all_link_list = MONGO_STORE_MANAGER.get_links_to_validate()
|
||||||
|
_queue_name = TEST_QUEUE
|
||||||
|
_segment_number = 4
|
||||||
|
last_thread = None
|
||||||
|
for i in range(0, _segment_number):
|
||||||
|
logger.info("{}:{} links to validate".format(threading.currentThread().name, len(all_link_list)))
|
||||||
|
logger.info("segment is {}".format(i))
|
||||||
|
_cookiesPublisher = CookiesPublisher(queue_name=TEST_QUEUE)
|
||||||
|
_cookiesPublisher.set_up_connection()
|
||||||
|
_step = int(len(all_link_list) / _segment_number)
|
||||||
|
_sublist = all_link_list[i * _step:_step * (i + 1)]
|
||||||
|
_thread1 = threading.Thread(target=validate_links, args=(_cookiesPublisher, SHARED_OBJECT, _sublist))
|
||||||
|
last_thread = _thread1
|
||||||
|
_thread1.start()
|
||||||
|
last_thread.join()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
# generate test data
|
||||||
|
validate_all_links()
|
||||||
Reference in New Issue
Block a user