can check link
This commit is contained in:
@@ -10,8 +10,10 @@ import requests
|
|||||||
from db.mongo_manager import MONGO_STORE_MANAGER
|
from db.mongo_manager import MONGO_STORE_MANAGER
|
||||||
from models.result_pojo import RequestResult
|
from models.result_pojo import RequestResult
|
||||||
from proxy_manager.proxy_manager import ProxyManager
|
from proxy_manager.proxy_manager import ProxyManager
|
||||||
from queue_message.CookiesPublisher import CookiesPublisher, TEST_QUEUE, MORNING_DATA_CACHE
|
from queue_message.CookiesPublisher import CookiesPublisher, TEST_QUEUE, MORNING_DATA_CACHE, MORNING_DATA_CACHE_BAK, \
|
||||||
|
REQUEST_DATA_QUEUE_TEST
|
||||||
from queue_message.appointmentrequestsender import QUEUE_HOST, REQUEST_DATA_QUEUE, credentials
|
from queue_message.appointmentrequestsender import QUEUE_HOST, REQUEST_DATA_QUEUE, credentials
|
||||||
|
from request_sender import logger
|
||||||
from workers.proxies_constants import PROXY_LIST_FR
|
from workers.proxies_constants import PROXY_LIST_FR
|
||||||
|
|
||||||
DOUBLE_MESSAGE = "Une demande de rendez-vous a déjà été enregistrée avec ces coordonnées"
|
DOUBLE_MESSAGE = "Une demande de rendez-vous a déjà été enregistrée avec ces coordonnées"
|
||||||
@@ -20,15 +22,15 @@ INVALID = "Depuis plus de 130 ans,"
|
|||||||
|
|
||||||
class ResultLinkChecker(threading.Thread):
|
class ResultLinkChecker(threading.Thread):
|
||||||
|
|
||||||
def __init__(self, cookiesPublisher: CookiesPublisher, all_links: list, proxy_manager: ProxyManager,
|
def __init__(self, cookiesPublisher: CookiesPublisher, all_links: list, proxy_manager: ProxyManager, logger,
|
||||||
queue_to_listen=REQUEST_DATA_QUEUE,
|
queue_to_listen=REQUEST_DATA_QUEUE,
|
||||||
ip_country="FR", segment_position=1,
|
ip_country="FR",
|
||||||
limit=40):
|
limit=40, ):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self.link_to_validate_list = all_links
|
self.link_to_validate_list = all_links
|
||||||
self.cookie = SimpleCookie()
|
self.cookie = SimpleCookie()
|
||||||
self.cookiesPublisher = cookiesPublisher
|
self.cookiesPublisher = cookiesPublisher
|
||||||
self.segment_position = segment_position
|
self.logger = logger
|
||||||
self.queue_to_listen = queue_to_listen
|
self.queue_to_listen = queue_to_listen
|
||||||
self.ip_country = ip_country
|
self.ip_country = ip_country
|
||||||
self.proxy_manager = proxy_manager
|
self.proxy_manager = proxy_manager
|
||||||
@@ -103,6 +105,12 @@ class ResultLinkChecker(threading.Thread):
|
|||||||
print(error)
|
print(error)
|
||||||
return RequestResult.PROXY_ERROR
|
return RequestResult.PROXY_ERROR
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
self.logger.info(threading.currentThread().name + " starts")
|
||||||
|
self.set_up_connection()
|
||||||
|
self.listen_to_queue(self.on_message)
|
||||||
|
self.channel.start_consuming()
|
||||||
|
|
||||||
def on_message(self, ch, method, properties, body):
|
def on_message(self, ch, method, properties, body):
|
||||||
print(f" [x] Received {body}")
|
print(f" [x] Received {body}")
|
||||||
_message_in_queue_count = self.cookiesPublisher.message_count()
|
_message_in_queue_count = self.cookiesPublisher.message_count()
|
||||||
@@ -147,25 +155,39 @@ class ResultLinkChecker(threading.Thread):
|
|||||||
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
|
ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
|
||||||
|
|
||||||
|
|
||||||
def check_all_links(segment_position=1):
|
def start_check_links(_queue_name, link_list, logger):
|
||||||
_queue_name = MORNING_DATA_CACHE
|
|
||||||
_all_items = MONGO_STORE_MANAGER.get_all_successful_items_for_day()
|
|
||||||
_link_list = []
|
|
||||||
for _item in _all_items:
|
|
||||||
if _item.url_validated is not None and _item.url_validated is True:
|
|
||||||
_link_list.append(_item.url)
|
|
||||||
print(_link_list)
|
|
||||||
cookiesPublisher = CookiesPublisher(queue_name=MORNING_DATA_CACHE)
|
|
||||||
cookiesPublisher.set_up_connection()
|
|
||||||
print("filter links with ip_country")
|
|
||||||
_proxy_manager = ProxyManager()
|
_proxy_manager = ProxyManager()
|
||||||
receiver = ResultLinkChecker(cookiesPublisher=cookiesPublisher, proxy_manager=_proxy_manager, all_links=_link_list,
|
cookiesPublisher = CookiesPublisher(queue_name=REQUEST_DATA_QUEUE_TEST)
|
||||||
queue_to_listen=_queue_name, ip_country="FR", segment_position=segment_position,
|
cookiesPublisher.set_up_connection()
|
||||||
|
receiver = ResultLinkChecker(cookiesPublisher=cookiesPublisher, proxy_manager=_proxy_manager, all_links=link_list,
|
||||||
|
queue_to_listen=_queue_name, ip_country="FR", logger=logger,
|
||||||
limit=0)
|
limit=0)
|
||||||
print("will connect to queue")
|
print("will connect to queue")
|
||||||
receiver.set_up_connection()
|
receiver.set_up_connection()
|
||||||
receiver.listen_to_queue(receiver.on_message)
|
receiver.listen_to_queue(receiver.on_message)
|
||||||
|
|
||||||
|
|
||||||
|
def check_all_links(_segment_number=100):
|
||||||
|
_queue_name = MORNING_DATA_CACHE_BAK
|
||||||
|
_all_items = MONGO_STORE_MANAGER.get_all_successful_items_for_day()
|
||||||
|
_link_list = []
|
||||||
|
for _item in _all_items:
|
||||||
|
if _item.url_validated is not None and _item.url_validated is True:
|
||||||
|
_link_list.append(_item.url)
|
||||||
|
print(_link_list)
|
||||||
|
thread_list = []
|
||||||
|
for i in range(0, _segment_number):
|
||||||
|
logger.info("segment is {}".format(i))
|
||||||
|
_step = int(len(_link_list) / _segment_number)
|
||||||
|
_sublist = _link_list[i * _step:_step * (i + 1)]
|
||||||
|
_thread1 = threading.Thread(target=start_check_links, args=(MORNING_DATA_CACHE_BAK, _sublist, logger))
|
||||||
|
thread_list.append(_thread1)
|
||||||
|
_thread1.start()
|
||||||
|
for t in thread_list:
|
||||||
|
t.join()
|
||||||
|
|
||||||
|
print("filter links with ip_country")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
check_all_links()
|
check_all_links()
|
||||||
|
|||||||
Reference in New Issue
Block a user