python

一个自定义python分布式专用爬虫框架。支持断点爬取和确保消息100%不丢失,哪怕是在爬取进行中随意关停和随意对电脑断电。 - 北风之神0509 - 博客园

文章暂存

systemime
2021-05-12
13 min

摘要.

0、此框架只能用于爬虫,由框架来调度 url 请求,必须按照此方式开发,没有做到类似 celery 的通用分布式功能,也不方便测试。可以使用另外一个,基于函数式编程的,调度一切函数的分布式框架,做到了兼容任何新老代码,满足任何需要分布式的场景。

一个分布式爬虫框架。比 scrapy 简单很多,不需要各种 item pipeline middwares spider settings run 文件之间来回切换写代码,这只需要一个文件,开发时候可以节约很多时间,形式非常松,需要重写一个方发,自己想怎么解析入库都可以,不需要定义 item 和写 pipeline 存储。自带的 RequestClient 支持 cookie 简单操作,支持一键切换 ip 代理的使用方式,不需要写这方面的中间件。

推荐使用 rabbitmq 作为消息中间件,能确保消费正确,可以随便任何时候关停程序。使用 redis 如果随意停止,会丢失正在请求或还没解析入库的任务,线程进程越多,丢的越多。

# coding=utf-8

import abc import math import json import queue import time from collections import OrderedDict # noinspection PyUnresolvedReferences from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor from threading import Lock from pika import BasicProperties # noinspection PyUnresolvedReferences from app.utils_ydf import LoggerMixin, LogManager, MongoMixin, RedisMixin, RequestClient, decorators, RedisBulkWriteHelper, RedisOperation, MongoBulkWriteHelper, MysqlBulkWriteHelper, RabbitMqHelper class BoundedThreadPoolExecutor(ThreadPoolExecutor): def __init__(self, max_workers=None, thread_name_prefix=''): super().__init__(max_workers, thread_name_prefix) self._work_queue \= queue.Queue(max_workers * 2) class StatusError(Exception): pass

class VolunteerErrorForSpiderRetry(Exception): """ 此类型的错误,如果被__request_and_extract 捕获,不记录错误日志。只为了错误重试。 """

# noinspection PyBroadException

class BaseCustomSpider(LoggerMixin, MongoMixin, RedisMixin, metaclass=abc.ABCMeta): """ 一个精简的自定义的基于 reids 任务调度的分布式基础爬虫框架 (所谓分布式就是可以水平扩展,一台机器开启多进程不需要修改代码或者多次重复启动 python 程序,以及多个机器都可以启动此程序)。子类只需要几行重写_request_and_extract 方法, 就可以快速开发并发 分布式的爬虫项目,比 scrapy 简单很多。 用法 BookingListPageSpider 继承 BaseCustomSpider,重写_request_and_extract 完成解析和入库。以下为启动方式。 BookingListPageSpider('booking:listpage_urls', threads_num=500).set_request_timeout(100).set_request_proxy('kuai').start_craw() # start_craw 是非阻塞的命令,可以直接在当前主线程再运行一个详情页的 spider """ lock \= Lock() pool_schedu_task \= BoundedThreadPoolExecutor(200) # 如果是外网使用 redis 可能存在延迟,使用 10 个线程。

def \_\_init\_\_(self, seed\_key: str = None, request\_method='get', threads\_num=100, proxy\_name='kuai', log\_level=1): """ :param seed\_key: redis的seed键
    :param request\_method: 请求方式get或者post
    :param threads\_num:request并发数量
    :param proxy\_name:可为None, 'kuai', 'abuyun', 'crawlera',为None不使用代理 """ self.\_\_check\_proxy\_name(proxy\_name)
    self.\_seed\_key \= seed\_key
    self.\_request\_metohd \= request\_method
    self.\_proxy\_name \= proxy\_name
    self.\_threads\_num \= threads\_num
    self.theadpool \= BoundedThreadPoolExecutor(threads\_num)
    self.logger.setLevel(log\_level \* 10)
    LogManager('RequestClient').get\_logger\_and\_add\_handlers(log\_level)
    self.\_initialization\_count()
    self.\_request\_headers \= None
    self.\_request\_timeout \= 60 self.\_max\_request\_retry\_times \= 5  # 请求错误重试请求的次数
    self.\_max\_parse\_retry\_times = 3  # 解析错误重试请求的次数
    self.\_is\_print\_detail\_exception = False
    self.logger.info(f'{self.\_\_class\_\_} 被实例化')

@staticmethod def \_\_check\_proxy\_name(proxy\_name): if proxy\_name not in (None, 'kuai', 'abuyun', 'crawlera'): raise ValueError('设置的代理ip名称错误') def \_initialization\_count(self):
    self.\_t1 \= time.time()
    self.\_request\_count \= 0
    self.\_request\_success\_count \= 0 def set\_max\_request\_retry\_times(self, max\_request\_retry\_times):
    self.\_max\_request\_retry\_times \= max\_request\_retry\_times return self def set\_request\_headers(self, headers: dict): """ self.request\_headers = {'user-agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36'} """ self.\_request\_headers \= headers return self  # 使其可以链式操作

def set\_request\_timeout(self, timeout: float):
    self.\_request\_timeout \= timeout return self def set\_request\_proxy(self, proxy\_name):
    self.\_\_check\_proxy\_name(proxy\_name)
    self.\_proxy\_name \= proxy\_name return self def set\_print\_detail\_exception(self, is\_print\_detail\_exception: bool):
    self.\_is\_print\_detail\_exception \= is\_print\_detail\_exception return self def \_calculate\_count\_per\_minute(self, flag):
    with self.lock: if time.time() - self.\_t1 > 60: # \_request\_count, \_request\_success\_count = self.\_request\_count, self.\_request\_success\_count
            self.logger.info(f'{self.\_\_class\_\_} 一分钟内请求了 {self.\_request\_count}次  成功了 {self.\_request\_success\_count}次, redis的{self.\_seed\_key} 键还有 {self.redis\_db7.scard(self.\_seed\_key)} 个种子')
            self.\_initialization\_count() if flag == 0:
            self.\_request\_count += 1
        if flag == 1:
            self.\_request\_success\_count += 1

def start\_craw(self): # self.\_schedu\_a\_task()
    \[self.pool\_schedu\_task.submit(self.\_schedu\_a\_task) for \_ in range(10)\]  # 如果是外网来链接broker会有传输损耗,影响整体速度。

# @decorators.tomorrow\_threads(300)
@decorators.keep\_circulating(time\_sleep=1)  # 防止redis异常了,导致程序中断需要手动重启程序。
def \_schedu\_a\_task(self): while True:
        seed\_bytes \= self.redis\_db7.spop(self.\_seed\_key) if seed\_bytes:
            seed\_dict \= json.loads(seed\_bytes) # noinspection PyProtectedMember
            self.logger.debug(f'当前线程数量是 {len(self.theadpool.\_threads)} ,种子是:  {seed\_dict}')
            self.theadpool.submit(self.\_\_request\_and\_extract, seed\_dict\['url'\], meta=seed\_dict) else:
            self.logger.warning(f'redis的 {self.\_seed\_key} 键是空的')
            time.sleep(2) # @decorators.handle\_exception(50, )
def \_dispacth\_request(self, url, data: dict = None, current\_url\_request\_times=0, ): # self.\_\_calculate\_count\_per\_minute(0)
    """ :param url: 请求url
    :param current\_url\_request\_times:
    :param data: post亲戚逇数据
    :return: """
    if current\_url\_request\_times < self.\_max\_request\_retry\_times: if current\_url\_request\_times > 0: pass
            # self.logger.debug(current\_url\_request\_times)
        # noinspection PyBroadException
        try:
            resp \= RequestClient(self.\_proxy\_name, timeout=self.\_request\_timeout).request\_with\_proxy(method=self.\_request\_metohd, url=url, headers=self.\_request\_headers, data=data)  # 使用快代
        except Exception as e:
            self.logger.error(f'第{current\_url\_request\_times + 1} 次request请求网络错误的原因是: {e}', exc\_info=0)
            self.\_calculate\_count\_per\_minute(0) return self.\_dispacth\_request(url, data, current\_url\_request\_times + 1) else: if resp.status\_code == 200:
                self.\_calculate\_count\_per\_minute(0)
                self.\_calculate\_count\_per\_minute(1) return resp else:
                self.logger.critical(f'返回状态是 {resp.status\_code}  --> {url}')
                self.\_calculate\_count\_per\_minute(0) return self.\_dispacth\_request(url, data, current\_url\_request\_times + 1) else:
        self.logger.critical(f'请求 {url} 达到最大次数{self.\_max\_request\_retry\_times}后,仍然失败') return None def put\_seed\_task\_to\_broker(self, seed\_key: str, seed\_dict: OrderedDict):
    seed\_str \= json.dumps(seed\_dict) # self.redis\_db7.sadd(redis\_key, seed\_str)
    RedisBulkWriteHelper(self.redis\_db7, threshold=50).add\_task(RedisOperation('sadd', seed\_key, seed\_str)) def \_\_request\_and\_extract(self, url, meta: OrderedDict, current\_retry\_times=0):  # 主要threadpoolexcutor没有毁掉结果时候会不记录错误,错误被隐藏了
    # noinspection PyBroadException
    if current\_retry\_times < self.\_max\_parse\_retry\_times: try:
            self.request\_and\_extract(url, meta) except Exception as e: if isinstance(e, VolunteerErrorForSpiderRetry): pass
            else:
                self.logger.error(f'第{current\_retry\_times+1}次发生解析错误的url是 {url}  \\n {e}', exc\_info=self.\_is\_print\_detail\_exception)
            self.\_\_request\_and\_extract(url, meta, current\_retry\_times + 1) else:
        self.logger.critical(f'解析 {url} 的页面内容达到最大次数{self.\_max\_parse\_retry\_times}后,仍然失败') # noinspection PyUnusedLocal

@abc.abstractmethod def request_and_extract(self, url, meta: OrderedDict): """ 子类需要重写此方法,完成解析和数据入库或者加入提取的 url 二次链接和传递的参数到 redis 的某个键。爬虫需要多层级页面提取的,重新实例化一个此类运行即可。 :param url: :param meta: :return: """""" 必须使用_dispacth_request 方法来请求 url,不要直接使用 requests, 否则不能够对请求错误成自动重试和每分钟请求数量统计和代理 ip 设置无效 response = self._dispacth_request(url) print(response.text) """ raise NotImplementedError # noinspection PyUnresolvedReferences class RabbitmqBrokerForSpiderMixin(metaclass=abc.ABCMeta): """ 推荐使用 rabbitmq 作为消息中间件。 不使用 redis 作为中间件,使用 rabbitmq 作为中间件,好处是可以随便在爬取过程中关闭程序,不会丢失当前任务。需要同时继承此类和 BaseCustomSpider 两个类,此类放在继承的第一个位置 """

# noinspection PyArgumentEqualDefault
LogManager('pika.heartbeat').get\_logger\_and\_add\_handlers(1)
lock\_channel \= Lock()

@property
@decorators.cached\_method\_result def \_channel\_publish(self):
    channel \= RabbitMqHelper().creat\_a\_channel() return channel

@decorators.cached\_property def \_channel\_statistics(self):
    channel \= RabbitMqHelper().creat\_a\_channel()
    channel.queue\_declare(queue\=self.\_seed\_key, durable=True) return channel

@decorators.keep\_circulating(time\_sleep\=1)  # 防止服务器rabbitmq关闭了,修复好后,自动恢复。
def \_schedu\_a\_task(self):
    channel \= RabbitMqHelper().creat\_a\_channel()
    channel.queue\_declare(queue\=self.\_seed\_key, durable=True)
    channel.basic\_qos(prefetch\_count\=int(math.ceil(self.\_threads\_num / 10))) def callback(ch, method, properties, body):
        seed\_dict \= json.loads(body) # noinspection PyProtectedMember
        self.logger.debug(f'rabbitmq种子是:  {seed\_dict}') # self.\_\_request\_and\_extract(ch, method, properties, seed\_dict\['url'\], meta=seed\_dict)
        self.theadpool.submit(self.\_\_request\_and\_extract, ch, method, properties, seed\_dict\['url'\], meta=seed\_dict)

    channel.basic\_consume(callback,
                          queue\=self.\_seed\_key, # no\_ack=True                        # 不需要确认,不确认随便关停spider会丢失一些任务。

) channel.start_consuming() def put_seed_task_to_broker(self, seed_key: str, seed_dict: OrderedDict): """ 添加种子或任务到 redis 中 :param seed_key: 种子 / 任务在 redis 的键 :param seed_dict: 任务,必须是一个有序字典类型,不能用字典,否则会插入相同的任务到 redis 中。字典中需要至少包含一个名叫 url 的键,可以添加其余的键用来携带各种初始任务信息。 :return: """ with self.lock_channel: channel \= self._channel_publish seed_str \= json.dumps(seed_dict) channel.queue_declare(queue\=seed_key, durable=True) channel.basic_publish(exchange\='', routing_key\=seed_key, body\=seed_str, properties\=BasicProperties( delivery_mode\=2, # make message persistent ) ) # noinspection PyArgumentEqualDefault def _calculate_count_per_minute(self, flag): with self.lock: if time.time() - self._t1 > 60: rabbitmq_queue \= self._channel_statistics.queue_declare( queue\=self._seed_key, durable=True, exclusive\=False, auto_delete=False ) self.logger.info(f'{self.__class__} 一分钟内请求了 {self._request_count} 次 成功了 {self._request_success_count} 次, rabbitmq 的 {self._seed_key} 队列中还有 {rabbitmq_queue.method.message_count} 条消息 ') self._initialization_count() if flag == 0: self._request_count += 1 if flag == 1: self._request_success_count += 1

# noinspection PyMethodOverriding
def \_\_request\_and\_extract(self, ch, method, properties, url, meta: OrderedDict, current\_retry\_times=0): # 防止有时候页面返回内容不正确,导致解析出错。
    if current\_retry\_times < self.\_max\_parse\_retry\_times: # noinspection PyBroadException
        try:
            self.request\_and\_extract(url, meta)
            ch.basic\_ack(delivery\_tag\=method.delivery\_tag) except Exception as e: if isinstance(e, VolunteerErrorForSpiderRetry): pass
            else:
                self.logger.error(f'第{current\_retry\_times+1}次发生解析错误的url是 {url}  \\n {e}', exc\_info=self.\_is\_print\_detail\_exception)
            self.\_\_request\_and\_extract(ch, method, properties, url, meta, current\_retry\_times + 1) else:
        self.logger.critical(f'解析 {url} 的页面内容达到最大次数{self.\_max\_parse\_retry\_times}后,仍然失败')
        ch.basic\_ack(delivery\_tag\=method.delivery\_tag) class BaseRabbitmqSpider(RabbitmqBrokerForSpiderMixin, BaseCustomSpider, metaclass=abc.ABCMeta): """ 也可以直接继承这一个类。 """

https://www.cnblogs.com/ydf0509/p/9787669.html https://www.cnblogs.com/ydf0509/p/9787669.html

上次编辑于: 5/20/2021, 7:26:49 AM