Merge branch 'refs/heads/master' into feautre/link_checker

This commit is contained in:
2024-06-01 22:36:02 +02:00
45 changed files with 3166 additions and 125 deletions
+21 -6
View File
@@ -2,7 +2,7 @@ import threading
import pika
from queue_message.CookiesPublisher import QUEUE_HOST, CookiesPublisher, MORNING_DATA_CACHE
from queue_message.CookiesPublisher import QUEUE_HOST, CookiesPublisher, MORNING_DATA_CACHE, MORNING_DATA_CACHE_2
credentials = pika.PlainCredentials('appointment', 'ZyuhJZ2xEYWhElhpJjy7YEpZGZwNYJz2fHIu')
@@ -24,6 +24,9 @@ class MessageTransporter(threading.Thread):
self.channel.basic_consume(queue=self.queue_to_listen, auto_ack=False, on_message_callback=self.on_message)
self.channel.start_consuming()
def message_count(self):
return self.channel.queue_declare(queue=self.queue_to_listen, durable=True).method.message_count
def on_message(self, ch, method, properties, body):
print(f" [x] Received {body}")
_received_object = body.decode("UTF-8")
@@ -31,11 +34,23 @@ class MessageTransporter(threading.Thread):
print("message count in queue is {}".format(_message_in_queue_count))
self.cookiesPublisher.publish_body(_received_object)
ch.basic_ack(delivery_tag=method.delivery_tag)
if self.message_count() == 0:
print("all messages are processed")
exit(0)
def migrate_message_to_queue(from_queue, to_queue="MORNING_DATA_CACHE_BAK"):
cookiesPublisher = CookiesPublisher(queue_name=to_queue)
cookiesPublisher.set_up_connection()
message_transporter = MessageTransporter(cookiesPublisher=cookiesPublisher, queue_to_listen=from_queue)
message_transporter.set_up_connection()
message_transporter.listen_to_queue()
if __name__ == '__main__':
cookiesPublisher = CookiesPublisher(queue_name="MORNING_DATA_CACHE_BAK")
cookiesPublisher.set_up_connection()
message_transporter = MessageTransporter(cookiesPublisher=cookiesPublisher, queue_to_listen=MORNING_DATA_CACHE)
message_transporter.set_up_connection()
message_transporter.listen_to_queue()
migrate_message_to_queue(from_queue=MORNING_DATA_CACHE_2)
# cookiesPublisher = CookiesPublisher(queue_name="MORNING_DATA_CACHE_BAK")
# cookiesPublisher.set_up_connection()
# message_transporter = MessageTransporter(cookiesPublisher=cookiesPublisher, queue_to_listen=MORNING_DATA_CACHE)
# message_transporter.set_up_connection()
# message_transporter.listen_to_queue()
+2 -2
View File
@@ -105,7 +105,7 @@ class CaptchaResultGetter:
return None
def get_ch_raw_data_from_js_data(self, js_data: JsDataPojo, old_valid_cookie) -> str:
_tag_version = "4.25.1"
_tag_version = "4.29.0"
_raw_data = "jsData={}&eventCounters=%5B%5D&jsType=ch&cid={}&ddk=789361B674144528D0B7EE76B35826&Referer=https%253A%252F%252Frendezvousparis.hermes.com%252Fclient%252Fregister&request=%252Fclient%252Fregister&responsePage=origin&ddv={}".format(
js_data.to_url_encoded_json(), old_valid_cookie, _tag_version)
print("raw ch data is " + _raw_data)
@@ -172,7 +172,7 @@ class CaptchaResultGetter:
# _le_js_raw_data = self.get_le_raw_data_from_js_data(js_le_type_data=js_le_type_data,
# old_valid_cookie=old_valid_cookie)
_cid = get_datadome_cookies(old_valid_cookie)
_raw_data = "jsData={}&eventCounters=%7B%22mousemove%22%3A{}%2C%22click%22%3A{}%2C%22scroll%22%3A{}%2C%22touchstart%22%3A{}%2C%22touchend%22%3A{}%2C%22touchmove%22%3A{}%2C%22keydown%22%3A{}%2C%22keyup%22%3A{}%7D&jsType=le&cid={}&ddk=789361B674144528D0B7EE76B35826&Referer=https%253A%252F%252Frendezvousparis.hermes.com%252Fclient%252Fregister&request=%252Fclient%252Fregister&responsePage=origin&ddv=4.25.1".format(
_raw_data = "jsData={}&eventCounters=%7B%22mousemove%22%3A{}%2C%22click%22%3A{}%2C%22scroll%22%3A{}%2C%22touchstart%22%3A{}%2C%22touchend%22%3A{}%2C%22touchmove%22%3A{}%2C%22keydown%22%3A{}%2C%22keyup%22%3A{}%7D&jsType=le&cid={}&ddk=789361B674144528D0B7EE76B35826&Referer=https%253A%252F%252Frendezvousparis.hermes.com%252Fclient%252Fregister&request=%252Fclient%252Fregister&responsePage=origin&ddv=4.29.0".format(
js_le_type_data.to_url_encoded_json(), mousemove_count, click_count, scroll_count, touch_count, touch_count,
touch_move,
key_count,
@@ -0,0 +1,254 @@
import json
import logging
import random
import threading
import time
from datetime import datetime
from http.cookies import SimpleCookie
import pika
import requests
from db.mongo_manager import MONGO_STORE_MANAGER
from models.LinkPojo import LinkPojo
from models.result_pojo import RequestResult
from proxy_manager.proxy_manager import ProxyManager
from queue_message.CookiesPublisher import CookiesPublisher, REQUEST_DATA_QUEUE_TEST, TEST_QUEUE, SHARED_OBJECT, \
MORNING_DATA_CACHE_BAK
from queue_message.appointmentrequestsender import QUEUE_HOST, REQUEST_DATA_QUEUE, credentials
from utils.AppLogging import init_logger
from workers.proxies_constants import PROXY_LIST_FR
DOUBLE_MESSAGE = "Une demande de rendez-vous a déjà été enregistrée avec ces coordonnées"
INVALID = "Depuis plus de 130 ans,"
init_logger()
logger = logging.getLogger()
class LinkValidatorWithProvidedList(threading.Thread):
def __init__(self, cookiesPublisher: CookiesPublisher, link_list: list,
queue_to_listen=REQUEST_DATA_QUEUE,
ip_country="FR",
limit=40):
super().__init__()
self.link_to_validate_list = link_list
self.cookie = SimpleCookie()
self.cookiesPublisher = cookiesPublisher
self.queue_to_listen = queue_to_listen
self.ip_country = ip_country
self.filter_with_ip_country()
self.proxy_manager = ProxyManager(logger)
self.limit = limit
def set_up_connection(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host=QUEUE_HOST, port=5672, credentials=credentials))
self.channel = self.connection.channel()
def listen_to_queue(self, callback):
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(queue=self.queue_to_listen, auto_ack=False, on_message_callback=callback)
self.channel.start_consuming()
def send_request(self, linkPojo: LinkPojo, _received_dict=None) -> RequestResult:
_ua = 'Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0.0.0 Mobile Safari/537.36'
if _received_dict is not None:
_ua = _received_dict['ua']
self.cookie.load(self.cookie_str)
headers = {
'User-Agent': _ua,
'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate, br',
'Cache-Control': 'max-age=0',
'Referer': linkPojo.url,
'Cookie': self.cookie_str,
'Sec-Fetch-Mode': 'navigate',
'Host': 'rendezvousparis.hermes.com',
'Sec-Fetch-Site': 'same-origin',
'Sec-Fetch-Dest': 'document',
'Accept-Language': 'fr-FR,fr;q=0.6'}
print("received cookie is " + str(self.cookie_str))
print("send request for link: " + linkPojo.url)
try:
print("will send request with ua {}".format(_ua))
print("will send request with cookie {}".format(self.cookie_str))
response = requests.get(url=linkPojo.url, headers=headers, verify=False, proxies=self.proxy_to_use,
timeout=60)
print(response.status_code)
if response.status_code == 200:
_content = response.text
print(response.text)
if "Votre demande de rendez-vous Maroquinerie a bien été enregistrée" in _content:
print(response.url)
MONGO_STORE_MANAGER.link_validated_for_result(response.url, linkPojo,
segement_position=threading.currentThread().name)
# set new cookies
_cookies_to_set = response.headers['set-cookie']
self.cookie.load(_cookies_to_set)
new_cookies = {k: v.value for k, v in self.cookie.items()}
new_coolies_str = ""
for key in new_cookies:
new_coolies_str = new_coolies_str + key + "=" + new_cookies[key] + ";"
print("will publish to queue {}".format(new_coolies_str))
# upload the cookie to queue
if _received_dict is not None:
_received_dict['cookiesStr'] = new_coolies_str
print("body in json:{}".format(json.dumps(_received_dict)))
self.cookiesPublisher.publish_body(json.dumps(_received_dict))
else:
self.cookiesPublisher.publish_body(new_coolies_str)
self.cookie_str = new_coolies_str
return RequestResult.SUCCESS
elif INVALID in _content:
MONGO_STORE_MANAGER.link_validated_for_result(response.url, linkPojo, is_invalid=True)
# set new cookies
_cookies_to_set = response.headers['set-cookie']
self.cookie.load(_cookies_to_set)
new_cookies = {k: v.value for k, v in self.cookie.items()}
new_coolies_str = ""
for key in new_cookies:
new_coolies_str = new_coolies_str + key + "=" + new_cookies[key] + ";"
print("will publish to queue {}".format(new_coolies_str))
# upload the cookie to queue
self.cookiesPublisher.publish_body(new_coolies_str)
self.cookie_str = new_coolies_str
return RequestResult.SUCCESS
elif DOUBLE_MESSAGE in _content:
print(response.url)
MONGO_STORE_MANAGER.link_validated_for_result(response.url, linkPojo, is_duplicated=True)
# set new cookies
_cookies_to_set = response.headers['set-cookie']
self.cookie.load(_cookies_to_set)
new_cookies = {k: v.value for k, v in self.cookie.items()}
new_coolies_str = ""
for key in new_cookies:
new_coolies_str = new_coolies_str + key + "=" + new_cookies[key] + ";"
print("will publish to queue {}".format(new_coolies_str))
# upload the cookie to queue
self.cookiesPublisher.publish_body(new_coolies_str)
self.cookie_str = new_coolies_str
return RequestResult.SUCCESS
else:
return RequestResult.UNKNOWN
elif response.status_code == 502:
return RequestResult.BAD_GATEWAY
else:
return RequestResult.BLOCKED
except Exception as error:
print(error)
return RequestResult.PROXY_ERROR
def on_message(self, ch, method, properties, body):
print(f" [x] Received {body}")
_message_in_queue_count = self.cookiesPublisher.message_count()
print("message count in queue is {}".format(_message_in_queue_count))
_received_object = body.decode("UTF-8")
_received_dict = None
if "glrd" in _received_object:
_received_dict = json.loads(_received_object)
_received_cookies = _received_dict["cookiesStr"]
else:
_received_cookies = _received_object
self.cookie_str = _received_cookies
random.shuffle(self.link_to_validate_list)
if len(self.link_to_validate_list) > 0 and _message_in_queue_count >= self.limit:
print("{}:links number is {}".format(threading.currentThread().name, len(self.link_to_validate_list)))
can_continue = None
for link_to_validate in self.link_to_validate_list:
print(link_to_validate)
self.proxy_to_use = random.choice(
self.proxy_manager.get_link_validate_proxy(self.link_to_validate_list))
print("proxy to use is {}".format(self.proxy_to_use))
can_continue = self.send_request(link_to_validate, _received_dict)
# remove the tested link from link list
self.link_to_validate_list.remove(link_to_validate)
if can_continue == RequestResult.BLOCKED:
print("cannot continue, blocked, then skip")
break
else:
if can_continue == RequestResult.BAD_GATEWAY:
time.sleep(30)
break
time.sleep(random.randint(2, 5))
print("can continue, continue")
if can_continue == RequestResult.BAD_GATEWAY or can_continue == RequestResult.PROXY_ERROR or can_continue == RequestResult.SUCCESS:
print("will requeue the message")
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
else:
print("will ack")
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
print("empty list, no need to ack")
time.sleep(5)
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
self.channel.stop_consuming()
def filter_with_ip_country(self):
_link_list_to_click = []
for _link in self.link_to_validate_list:
if _link.ip_country == self.ip_country:
_link_list_to_click.append(_link)
self.link_to_validate_list = _link_list_to_click
def validate_links(cookiesPublisher, queue_name: str, link_list: list):
receiver = LinkValidatorWithProvidedList(cookiesPublisher=cookiesPublisher, link_list=link_list,
queue_to_listen=queue_name, ip_country="FR", limit=0)
print("{} set_up_connection".format(threading.currentThread().name))
receiver.set_up_connection()
receiver.listen_to_queue(receiver.on_message)
# default_segment_number 并发数,决定速度
# divided = 4,越小,一次处理得越多
def validate_all_links():
print("will get all links")
all_link_list = MONGO_STORE_MANAGER.get_links_to_validate()
# get the first 50 links
if len(all_link_list) == 0:
return
divided = 3
default_segment_number = 20
_first_25_percent_links = all_link_list[0:(int(len(all_link_list) / divided))]
_queue_name = MORNING_DATA_CACHE_BAK
if len(all_link_list) > divided * default_segment_number:
_segment_number = default_segment_number
else:
_first_25_percent_links = all_link_list
if len(_first_25_percent_links) > divided:
_segment_number = int(len(_first_25_percent_links) / divided)
else:
_segment_number = 1
_thread_list = []
for i in range(0, _segment_number):
logger.info("{}:{} links to validate".format(threading.currentThread().name, len(_first_25_percent_links)))
logger.info("segment is {}".format(i))
_cookiesPublisher = CookiesPublisher(queue_name=_queue_name)
_cookiesPublisher.set_up_connection()
_step = int(len(_first_25_percent_links) / _segment_number)
_sublist = _first_25_percent_links[i * _step:_step * (i + 1)]
_thread1 = threading.Thread(target=validate_links, args=(_cookiesPublisher, MORNING_DATA_CACHE_BAK, _sublist))
_thread_list.append(_thread1)
_thread1.start()
for _thread in _thread_list:
_thread.join()
if __name__ == '__main__':
# generate test data
while True:
print("call validate_all_links()")
validate_all_links()
delay = random.randint(10, 30)
current_time = datetime.now()
current_hour = current_time.hour
print("Current hour ", current_time.hour)
print("Current minute ", current_time.minute)
if current_hour > 18 or current_hour < 7:
delay = 3600
print("wait for {} seconds".format(delay))
time.sleep(delay)
+7
View File
@@ -36,6 +36,13 @@ class Sender:
result = ReserveResultPojo(type=status, phone=contact.phone, message=status.value, url=url,
firstName=contact.first_name, lastName=contact.last_name, email=contact.mail,
passport=contact.passport, ccid=contact.ccid)
if "oxylabs" in self.proxy_to_use["http"]:
if "mob" in self.proxy_to_use["http"]:
result.proxy = "oxylabs_mob"
else:
result.proxy = "oxylabs_res"
else:
result.proxy = "data_impulse"
result.id = id
result.store_type = store_type
result.created_at = time.strftime("%H:%M:%S", time.localtime())