摘要.
0、此框架只能用于爬虫,由框架来调度 url 请求,必须按照此方式开发,没有做到类似 celery 的通用分布式功能,也不方便测试。可以使用另外一个,基于函数式编程的,调度一切函数的分布式框架,做到了兼容任何新老代码,满足任何需要分布式的场景。
一个分布式爬虫框架。比 scrapy 简单很多,不需要各种 item pipeline middwares spider settings run 文件之间来回切换写代码,这只需要一个文件,开发时候可以节约很多时间,形式非常松,需要重写一个方发,自己想怎么解析入库都可以,不需要定义 item 和写 pipeline 存储。自带的 RequestClient 支持 cookie 简单操作,支持一键切换 ip 代理的使用方式,不需要写这方面的中间件。
推荐使用 rabbitmq 作为消息中间件,能确保消费正确,可以随便任何时候关停程序。使用 redis 如果随意停止,会丢失正在请求或还没解析入库的任务,线程进程越多,丢的越多。
# coding=utf-8
import abc import math import json import queue import time from collections import OrderedDict # noinspection PyUnresolvedReferences from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor from threading import Lock from pika import BasicProperties # noinspection PyUnresolvedReferences from app.utils_ydf import LoggerMixin, LogManager, MongoMixin, RedisMixin, RequestClient, decorators, RedisBulkWriteHelper, RedisOperation, MongoBulkWriteHelper, MysqlBulkWriteHelper, RabbitMqHelper class BoundedThreadPoolExecutor(ThreadPoolExecutor): def __init__(self, max_workers=None, thread_name_prefix=''): super().__init__(max_workers, thread_name_prefix) self._work_queue \= queue.Queue(max_workers * 2) class StatusError(Exception): pass
class VolunteerErrorForSpiderRetry(Exception): """ 此类型的错误,如果被__request_and_extract 捕获,不记录错误日志。只为了错误重试。 """
# noinspection PyBroadException
class BaseCustomSpider(LoggerMixin, MongoMixin, RedisMixin, metaclass=abc.ABCMeta): """ 一个精简的自定义的基于 reids 任务调度的分布式基础爬虫框架 (所谓分布式就是可以水平扩展,一台机器开启多进程不需要修改代码或者多次重复启动 python 程序,以及多个机器都可以启动此程序)。子类只需要几行重写_request_and_extract 方法, 就可以快速开发并发 分布式的爬虫项目,比 scrapy 简单很多。 用法 BookingListPageSpider 继承 BaseCustomSpider,重写_request_and_extract 完成解析和入库。以下为启动方式。 BookingListPageSpider('booking:listpage_urls', threads_num=500).set_request_timeout(100).set_request_proxy('kuai').start_craw() # start_craw 是非阻塞的命令,可以直接在当前主线程再运行一个详情页的 spider """ lock \= Lock() pool_schedu_task \= BoundedThreadPoolExecutor(200) # 如果是外网使用 redis 可能存在延迟,使用 10 个线程。
def \_\_init\_\_(self, seed\_key: str = None, request\_method='get', threads\_num=100, proxy\_name='kuai', log\_level=1): """ :param seed\_key: redis的seed键
:param request\_method: 请求方式get或者post
:param threads\_num:request并发数量
:param proxy\_name:可为None, 'kuai', 'abuyun', 'crawlera',为None不使用代理 """ self.\_\_check\_proxy\_name(proxy\_name)
self.\_seed\_key \= seed\_key
self.\_request\_metohd \= request\_method
self.\_proxy\_name \= proxy\_name
self.\_threads\_num \= threads\_num
self.theadpool \= BoundedThreadPoolExecutor(threads\_num)
self.logger.setLevel(log\_level \* 10)
LogManager('RequestClient').get\_logger\_and\_add\_handlers(log\_level)
self.\_initialization\_count()
self.\_request\_headers \= None
self.\_request\_timeout \= 60 self.\_max\_request\_retry\_times \= 5 # 请求错误重试请求的次数
self.\_max\_parse\_retry\_times = 3 # 解析错误重试请求的次数
self.\_is\_print\_detail\_exception = False
self.logger.info(f'{self.\_\_class\_\_} 被实例化')
@staticmethod def \_\_check\_proxy\_name(proxy\_name): if proxy\_name not in (None, 'kuai', 'abuyun', 'crawlera'): raise ValueError('设置的代理ip名称错误') def \_initialization\_count(self):
self.\_t1 \= time.time()
self.\_request\_count \= 0
self.\_request\_success\_count \= 0 def set\_max\_request\_retry\_times(self, max\_request\_retry\_times):
self.\_max\_request\_retry\_times \= max\_request\_retry\_times return self def set\_request\_headers(self, headers: dict): """ self.request\_headers = {'user-agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36'} """ self.\_request\_headers \= headers return self # 使其可以链式操作
def set\_request\_timeout(self, timeout: float):
self.\_request\_timeout \= timeout return self def set\_request\_proxy(self, proxy\_name):
self.\_\_check\_proxy\_name(proxy\_name)
self.\_proxy\_name \= proxy\_name return self def set\_print\_detail\_exception(self, is\_print\_detail\_exception: bool):
self.\_is\_print\_detail\_exception \= is\_print\_detail\_exception return self def \_calculate\_count\_per\_minute(self, flag):
with self.lock: if time.time() - self.\_t1 > 60: # \_request\_count, \_request\_success\_count = self.\_request\_count, self.\_request\_success\_count
self.logger.info(f'{self.\_\_class\_\_} 一分钟内请求了 {self.\_request\_count}次 成功了 {self.\_request\_success\_count}次, redis的{self.\_seed\_key} 键还有 {self.redis\_db7.scard(self.\_seed\_key)} 个种子')
self.\_initialization\_count() if flag == 0:
self.\_request\_count += 1
if flag == 1:
self.\_request\_success\_count += 1
def start\_craw(self): # self.\_schedu\_a\_task()
\[self.pool\_schedu\_task.submit(self.\_schedu\_a\_task) for \_ in range(10)\] # 如果是外网来链接broker会有传输损耗,影响整体速度。
# @decorators.tomorrow\_threads(300)
@decorators.keep\_circulating(time\_sleep=1) # 防止redis异常了,导致程序中断需要手动重启程序。
def \_schedu\_a\_task(self): while True:
seed\_bytes \= self.redis\_db7.spop(self.\_seed\_key) if seed\_bytes:
seed\_dict \= json.loads(seed\_bytes) # noinspection PyProtectedMember
self.logger.debug(f'当前线程数量是 {len(self.theadpool.\_threads)} ,种子是: {seed\_dict}')
self.theadpool.submit(self.\_\_request\_and\_extract, seed\_dict\['url'\], meta=seed\_dict) else:
self.logger.warning(f'redis的 {self.\_seed\_key} 键是空的')
time.sleep(2) # @decorators.handle\_exception(50, )
def \_dispacth\_request(self, url, data: dict = None, current\_url\_request\_times=0, ): # self.\_\_calculate\_count\_per\_minute(0)
""" :param url: 请求url
:param current\_url\_request\_times:
:param data: post亲戚逇数据
:return: """
if current\_url\_request\_times < self.\_max\_request\_retry\_times: if current\_url\_request\_times > 0: pass
# self.logger.debug(current\_url\_request\_times)
# noinspection PyBroadException
try:
resp \= RequestClient(self.\_proxy\_name, timeout=self.\_request\_timeout).request\_with\_proxy(method=self.\_request\_metohd, url=url, headers=self.\_request\_headers, data=data) # 使用快代
except Exception as e:
self.logger.error(f'第{current\_url\_request\_times + 1} 次request请求网络错误的原因是: {e}', exc\_info=0)
self.\_calculate\_count\_per\_minute(0) return self.\_dispacth\_request(url, data, current\_url\_request\_times + 1) else: if resp.status\_code == 200:
self.\_calculate\_count\_per\_minute(0)
self.\_calculate\_count\_per\_minute(1) return resp else:
self.logger.critical(f'返回状态是 {resp.status\_code} --> {url}')
self.\_calculate\_count\_per\_minute(0) return self.\_dispacth\_request(url, data, current\_url\_request\_times + 1) else:
self.logger.critical(f'请求 {url} 达到最大次数{self.\_max\_request\_retry\_times}后,仍然失败') return None def put\_seed\_task\_to\_broker(self, seed\_key: str, seed\_dict: OrderedDict):
seed\_str \= json.dumps(seed\_dict) # self.redis\_db7.sadd(redis\_key, seed\_str)
RedisBulkWriteHelper(self.redis\_db7, threshold=50).add\_task(RedisOperation('sadd', seed\_key, seed\_str)) def \_\_request\_and\_extract(self, url, meta: OrderedDict, current\_retry\_times=0): # 主要threadpoolexcutor没有毁掉结果时候会不记录错误,错误被隐藏了
# noinspection PyBroadException
if current\_retry\_times < self.\_max\_parse\_retry\_times: try:
self.request\_and\_extract(url, meta) except Exception as e: if isinstance(e, VolunteerErrorForSpiderRetry): pass
else:
self.logger.error(f'第{current\_retry\_times+1}次发生解析错误的url是 {url} \\n {e}', exc\_info=self.\_is\_print\_detail\_exception)
self.\_\_request\_and\_extract(url, meta, current\_retry\_times + 1) else:
self.logger.critical(f'解析 {url} 的页面内容达到最大次数{self.\_max\_parse\_retry\_times}后,仍然失败') # noinspection PyUnusedLocal
@abc.abstractmethod def request_and_extract(self, url, meta: OrderedDict): """ 子类需要重写此方法,完成解析和数据入库或者加入提取的 url 二次链接和传递的参数到 redis 的某个键。爬虫需要多层级页面提取的,重新实例化一个此类运行即可。 :param url: :param meta: :return: """""" 必须使用_dispacth_request 方法来请求 url,不要直接使用 requests, 否则不能够对请求错误成自动重试和每分钟请求数量统计和代理 ip 设置无效 response = self._dispacth_request(url) print(response.text) """ raise NotImplementedError # noinspection PyUnresolvedReferences class RabbitmqBrokerForSpiderMixin(metaclass=abc.ABCMeta): """ 推荐使用 rabbitmq 作为消息中间件。 不使用 redis 作为中间件,使用 rabbitmq 作为中间件,好处是可以随便在爬取过程中关闭程序,不会丢失当前任务。需要同时继承此类和 BaseCustomSpider 两个类,此类放在继承的第一个位置 """
# noinspection PyArgumentEqualDefault
LogManager('pika.heartbeat').get\_logger\_and\_add\_handlers(1)
lock\_channel \= Lock()
@property
@decorators.cached\_method\_result def \_channel\_publish(self):
channel \= RabbitMqHelper().creat\_a\_channel() return channel
@decorators.cached\_property def \_channel\_statistics(self):
channel \= RabbitMqHelper().creat\_a\_channel()
channel.queue\_declare(queue\=self.\_seed\_key, durable=True) return channel
@decorators.keep\_circulating(time\_sleep\=1) # 防止服务器rabbitmq关闭了,修复好后,自动恢复。
def \_schedu\_a\_task(self):
channel \= RabbitMqHelper().creat\_a\_channel()
channel.queue\_declare(queue\=self.\_seed\_key, durable=True)
channel.basic\_qos(prefetch\_count\=int(math.ceil(self.\_threads\_num / 10))) def callback(ch, method, properties, body):
seed\_dict \= json.loads(body) # noinspection PyProtectedMember
self.logger.debug(f'rabbitmq种子是: {seed\_dict}') # self.\_\_request\_and\_extract(ch, method, properties, seed\_dict\['url'\], meta=seed\_dict)
self.theadpool.submit(self.\_\_request\_and\_extract, ch, method, properties, seed\_dict\['url'\], meta=seed\_dict)
channel.basic\_consume(callback,
queue\=self.\_seed\_key, # no\_ack=True # 不需要确认,不确认随便关停spider会丢失一些任务。
) channel.start_consuming() def put_seed_task_to_broker(self, seed_key: str, seed_dict: OrderedDict): """ 添加种子或任务到 redis 中 :param seed_key: 种子 / 任务在 redis 的键 :param seed_dict: 任务,必须是一个有序字典类型,不能用字典,否则会插入相同的任务到 redis 中。字典中需要至少包含一个名叫 url 的键,可以添加其余的键用来携带各种初始任务信息。 :return: """ with self.lock_channel: channel \= self._channel_publish seed_str \= json.dumps(seed_dict) channel.queue_declare(queue\=seed_key, durable=True) channel.basic_publish(exchange\='', routing_key\=seed_key, body\=seed_str, properties\=BasicProperties( delivery_mode\=2, # make message persistent ) ) # noinspection PyArgumentEqualDefault def _calculate_count_per_minute(self, flag): with self.lock: if time.time() - self._t1 > 60: rabbitmq_queue \= self._channel_statistics.queue_declare( queue\=self._seed_key, durable=True, exclusive\=False, auto_delete=False ) self.logger.info(f'{self.__class__} 一分钟内请求了 {self._request_count} 次 成功了 {self._request_success_count} 次, rabbitmq 的 {self._seed_key} 队列中还有 {rabbitmq_queue.method.message_count} 条消息 ') self._initialization_count() if flag == 0: self._request_count += 1 if flag == 1: self._request_success_count += 1
# noinspection PyMethodOverriding
def \_\_request\_and\_extract(self, ch, method, properties, url, meta: OrderedDict, current\_retry\_times=0): # 防止有时候页面返回内容不正确,导致解析出错。
if current\_retry\_times < self.\_max\_parse\_retry\_times: # noinspection PyBroadException
try:
self.request\_and\_extract(url, meta)
ch.basic\_ack(delivery\_tag\=method.delivery\_tag) except Exception as e: if isinstance(e, VolunteerErrorForSpiderRetry): pass
else:
self.logger.error(f'第{current\_retry\_times+1}次发生解析错误的url是 {url} \\n {e}', exc\_info=self.\_is\_print\_detail\_exception)
self.\_\_request\_and\_extract(ch, method, properties, url, meta, current\_retry\_times + 1) else:
self.logger.critical(f'解析 {url} 的页面内容达到最大次数{self.\_max\_parse\_retry\_times}后,仍然失败')
ch.basic\_ack(delivery\_tag\=method.delivery\_tag) class BaseRabbitmqSpider(RabbitmqBrokerForSpiderMixin, BaseCustomSpider, metaclass=abc.ABCMeta): """ 也可以直接继承这一个类。 """
https://www.cnblogs.com/ydf0509/p/9787669.html https://www.cnblogs.com/ydf0509/p/9787669.html