Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

增加对哨兵密码的支持 Support connecting to sentinel with password #3

Open
wants to merge 43 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
065f94d
增加对哨兵密码的支持 Support connecting to sentinel with password
luzihang123 Jan 27, 2022
777900a
对collections中Iterable的引入,增加对高版本Python的兼容性
luzihang123 Jan 27, 2022
4e1c5c3
add:dupefilter/filtered
luzihang123 Apr 11, 2022
b60c1e0
update README.md
luzihang123 Apr 11, 2022
ffda2f7
update: scrapy_redis_sentinel --> mob_scrapy_redis_sentinel
luzihang123 Apr 11, 2022
024a8dd
add: 添加 make request 和 get request from next_request,2个自动的,包含trace_id日志
luzihang123 Apr 13, 2022
a76d01e
fix
luzihang123 Apr 13, 2022
6f784a4
update README.md
luzihang123 Apr 14, 2022
76263da
feat: add LATEST_QUEUE_KEY, 最近一次队列备份(任务防丢)
luzihang123 Jun 2, 2022
7d6d3bb
update: setup.py
luzihang123 Jun 2, 2022
42dd20e
fix: generator func
luzihang123 Jun 2, 2022
68d006d
update
luzihang123 Jun 2, 2022
d688adc
update
luzihang123 Jun 2, 2022
e694c6d
update
luzihang123 Jun 2, 2022
352a10a
update
luzihang123 Jun 2, 2022
97a5b91
update
luzihang123 Jun 2, 2022
35878fa
update
luzihang123 Jun 2, 2022
6d3345c
update
luzihang123 Jun 2, 2022
825b2aa
update
luzihang123 Jun 2, 2022
c045059
update
luzihang123 Jun 2, 2022
fa7f889
update version from 0.4 to 0.5
Jun 6, 2022
1fd8cba
update version from 0.4 to 0.5
Jun 6, 2022
cd74165
update
luzihang123 Jun 6, 2022
9559b8d
添加 rocket mq 使用
luzihang123 Jun 13, 2022
6fe8a37
update get_queue_size queueSize type
luzihang123 Jun 13, 2022
f998762
fix None data error
luzihang123 Jun 14, 2022
628873c
modify queue_name, can run according env dev/prod
luzihang123 Jun 14, 2022
6ba35b0
redis_key添加默认值,默认spider name
luzihang123 Jun 14, 2022
7798f1c
redis_key添加默认值,默认spider name
luzihang123 Jun 14, 2022
50669db
改变识别环境的条件,为ip
luzihang123 Jun 15, 2022
c4557f4
改变识别环境的条件,为ip
luzihang123 Jun 15, 2022
07e3857
改变识别环境的条件,为ip
luzihang123 Jun 15, 2022
a1dd180
改变识别环境的条件,为ip
luzihang123 Jun 15, 2022
927931a
改变识别环境的条件,为ip
luzihang123 Jun 15, 2022
d9ba5b9
爬虫初始化的时候,检查队列是否存在
luzihang123 Jun 16, 2022
8c49b0f
update version
luzihang123 Jun 17, 2022
a1ffcd8
update README
luzihang123 Jun 22, 2022
540d5c0
update MongoDupeFilter
luzihang123 Jun 22, 2022
f1074f7
update: 处理mq并发重复
luzihang123 Jun 24, 2022
c03cb8f
update: version
luzihang123 Jun 24, 2022
63a69f4
update: mob-tools==0.0.17
luzihang123 Jun 24, 2022
cfd6029
update: 判断是否使用了MQ
luzihang123 Jun 24, 2022
6cebedb
update: 缩短过期时间
luzihang123 Jun 24, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ __pycache__/
*.py[cod]
*$py.class

.idea

# C extensions
*.so

Expand Down
28 changes: 21 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@
![GitHub last commit](https://img.shields.io/github/last-commit/crawlmap/scrapy-redis-sentinel)
![PyPI - Downloads](https://img.shields.io/pypi/dw/scrapy-redis-sentinel)

本项目基于原项目 [scrapy-redis-sentinel](https://github.com/crawlaio/scrapy-redis-sentinel)

进行修改,修改内容如下:

1. 添加了 Redis 哨兵,存在2个密码连接的支持
2. 支持Python3.8+(collection.abc的引入方式)
3. 填补 `dupefilter.py` 丢失的 "dupefilter/filtered" 的stats,利于爬虫进度数据分析
4. 自动添加 track_id: "make request from data" 和 "get request from next_request "
5. 增加任务防丢: 每次备份上一次任务,启动爬虫时,任务回队列首。`defaults.LATEST_QUEUE_KEY`
6. 增加使用shield进行任务调度: `MQ_USED`
-----

本项目基于原项目 [scrapy-redis](https://github.com/rmax/scrapy-redis)

进行修改,修改内容如下:
Expand Down Expand Up @@ -53,6 +65,8 @@ REDIS_SENTINELS = [
('172.25.2.26', 26379),
('172.25.2.27', 26379)
]
# SENTINEL_KWARGS 非必须参数,可以设置sentinel密码,参考 https://github.com/redis/redis-py/issues/1219
SENTINEL_KWARGS = {'password': 'sentinel_password'}

# REDIS_SENTINEL_PARAMS 哨兵模式配置参数。
REDIS_SENTINEL_PARAMS = {
Expand Down Expand Up @@ -80,22 +94,22 @@ REDIS_CLUSTER_PARAMS = {
# 在 redis 中保持 scrapy-redis 用到的各个队列,从而允许暂停和暂停后恢复,也就是不清理 redis queues
SCHEDULER_PERSIST = True
# 调度队列
SCHEDULER = "scrapy_redis_sentinel.scheduler.Scheduler"
SCHEDULER = "mob_scrapy_redis_sentinel.scheduler.Scheduler"
# 基础去重
DUPEFILTER_CLASS = "scrapy_redis_sentinel.dupefilter.RedisDupeFilter"
DUPEFILTER_CLASS = "mob_scrapy_redis_sentinel.dupefilter.RedisDupeFilter"
# BloomFilter
# DUPEFILTER_CLASS = "scrapy_redis_sentinel.dupefilter.RedisBloomFilter"
# DUPEFILTER_CLASS = "mob_scrapy_redis_sentinel.dupefilter.RedisBloomFilter"

# 启用基于 Redis 统计信息
STATS_CLASS = "scrapy_redis_sentinel.stats.RedisStatsCollector"
STATS_CLASS = "mob_scrapy_redis_sentinel.stats.RedisStatsCollector"

# 指定排序爬取地址时使用的队列
# 默认的 按优先级排序( Scrapy 默认),由 sorted set 实现的一种非 FIFO、LIFO 方式。
# SCHEDULER_QUEUE_CLASS = 'scrapy_redis_sentinel.queue.SpiderPriorityQueue'
# SCHEDULER_QUEUE_CLASS = 'mob_scrapy_redis_sentinel.queue.SpiderPriorityQueue'
# 可选的 按先进先出排序(FIFO)
# SCHEDULER_QUEUE_CLASS = 'scrapy_redis_sentinel.queue.SpiderStack'
# SCHEDULER_QUEUE_CLASS = 'mob_scrapy_redis_sentinel.queue.SpiderStack'
# 可选的 按后进先出排序(LIFO)
# SCHEDULER_QUEUE_CLASS = 'scrapy_redis_sentinel.queue.SpiderStack'
# SCHEDULER_QUEUE_CLASS = 'mob_scrapy_redis_sentinel.queue.SpiderStack'
```

> 注:当使用集群时单机不生效
Expand Down
19 changes: 19 additions & 0 deletions mob_scrapy_redis_sentinel/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-

__original_author__ = "Rolando Espinoza"
__author__ = "luzihang"
__email__ = "[email protected]"
__version__ = "1.0"

from mob_tools.mobLog import MobLoguru
from mob_tools.inner_ip import get_inner_ip

inner_ip = get_inner_ip()

PRODUCTION_ENV_TAG = '10.90'
# 不是以10.90开头的,认为是非生产环境
if inner_ip.startswith(PRODUCTION_ENV_TAG):
mob_log = MobLoguru(deep=2, log_file='/data/logs/crawler/crawler.log.es')
else:
mob_log = MobLoguru()
inner_ip = "127.0.0.1"
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def get_redis_sentinel_from_settings(settings):
params.update(settings.getdict("REDIS_SENTINEL_PARAMS"))
params.setdefault("sentinels", settings.get("REDIS_SENTINELS"))
params.setdefault("socket_timeout", settings.get("REDIS_SENTINELS_SOCKET_TIMEOUT"))
params.setdefault("sentinel_kwargs", settings.get("SENTINEL_KWARGS"))
return get_redis_sentinel(**params)


Expand All @@ -165,7 +166,10 @@ def get_redis_sentinel(**kwargs):
redis_sentinel_cls = kwargs.get("redis_cluster_cls", defaults.REDIS_SENTINEL_CLS)
sentinels = kwargs.pop("sentinels", None)
socket_timeout = kwargs.pop("socket_timeout", 0.5)
redis_sentinel_cls = redis_sentinel_cls(sentinels=sentinels, socket_timeout=socket_timeout)
sentinel_kwargs = kwargs.pop("sentinel_kwargs", None)
redis_sentinel_cls = redis_sentinel_cls(sentinels=sentinels,
socket_timeout=socket_timeout,
sentinel_kwargs=sentinel_kwargs)
redis_cls = redis_sentinel_cls.master_for(**kwargs)
return redis_cls

Expand Down
66 changes: 66 additions & 0 deletions mob_scrapy_redis_sentinel/defaults.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# -*- coding: utf-8 -*-
import redis
import os
import rediscluster
from redis.sentinel import Sentinel

from mob_scrapy_redis_sentinel import inner_ip, mob_log

# For standalone use.
DUPEFILTER_KEY = "dupefilter:%(timestamp)s"

PIPELINE_KEY = "%(spider)s:items"

STATS_KEY = '%(spider)s:stats'

REDIS_CLS = redis.StrictRedis
REDIS_CLUSTER_CLS = rediscluster.RedisCluster
REDIS_SENTINEL_CLS = Sentinel

REDIS_ENCODING = "utf-8"
# Sane connection defaults.
REDIS_PARAMS = {
"socket_timeout": 30,
"socket_connect_timeout": 30,
"retry_on_timeout": True,
"encoding": REDIS_ENCODING
}

SCHEDULER_QUEUE_KEY = "%(spider)s:requests"
SCHEDULER_QUEUE_CLASS = "mob_scrapy_redis_sentinel.queue.PriorityQueue"
SCHEDULER_DUPEFILTER_KEY = "%(spider)s:dupefilter"
SCHEDULER_DUPEFILTER_CLASS = "mob_scrapy_redis_sentinel.dupefilter.RedisDupeFilter"

SCHEDULER_PERSIST = False

START_URLS_KEY = "%(name)s:start_urls"
START_URLS_AS_SET = False
START_URLS_AS_ZSET = False

# 最近一次队列备份(任务防丢)
"""
spider opened,读取 LATEST_QUEUE_KEY。获取上一次,stop 之前,最后一次的queue data;
每次make request from data,备份一份数据,到 LATEST_QUEUE_KEY。同时删除上一批的备份。(多个worker,删除同一个 LATEST_QUEUE_KEY,如何做到不互相干扰?)
"""
LATEST_QUEUE_KEY = "%(name)s:latest_queue"

"""
从MQ获取任务
"""
MQ_USED = False # 默认关闭

MQ_HOST = "http://10.89.104.148:10011"
# 创建队列
CREATE_QUEUE = MQ_HOST + "/rest/ms/GemMQ/createQueue?queueName={queueName}"
# 从指定队列中取出消息
POP_MESSAGE = MQ_HOST + "/rest/ms/GemMQ/popMessage?queueName={queueName}"
# 获取消息队列的大小
GET_QUEUE_SIZE = MQ_HOST + "/rest/ms/GemMQ/getQueueSize?queueName={queueName}"

# 与环境相关的配置
PRODUCTION_ENV_TAG = '10.90'
# 不是以10.90开头的,认为是非生产环境
if inner_ip.startswith(PRODUCTION_ENV_TAG):
QUEUE_NAME_PREFIX = "CRAWLER-UQ-{}"
else:
QUEUE_NAME_PREFIX = "CRAWLER-SANDBOX-UQ-{}"
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
import logging
import time

import pymongo
from pymongo.errors import DuplicateKeyError

from scrapy.dupefilters import BaseDupeFilter
from scrapy.utils.request import request_fingerprint

Expand All @@ -12,6 +15,74 @@
logger = logging.getLogger(__name__)


class MongoDupeFilter(BaseDupeFilter):
def __init__(self, mongo_uri, db, collection, debug=False, *args, **kwargs):
self.mongo = pymongo.MongoClient(mongo_uri)
self.mongo_db = db
self.collection = collection
self.debug = debug
self.logdupes = True
self.mongo[self.mongo_db][self.collection].create_index("fp", unique=True)

@classmethod
def from_settings(cls, settings):
mongo_uri = settings.get("MongoFilter_URI")
mongo_db = settings.get("MongoFilter_DB")
collection = defaults.DUPEFILTER_KEY % {"timestamp": int(time.time())}
debug = settings.getbool("DUPEFILTER_DEBUG", False)
return cls(mongo_uri=mongo_uri, db=mongo_db, collection=collection, debug=debug)

@classmethod
def from_crawler(cls, crawler):
return cls.from_settings(crawler.settings)

def request_seen(self, request):
fp = self.request_fingerprint(request)
# This returns the number of values added, zero if already exists.
doc = self.mongo[self.mongo_db][self.collection].find_one({"fp": fp})
if doc:
return True
try:
self.mongo[self.mongo_db][self.collection].insert_one({"fp": fp})
except DuplicateKeyError:
pass
return False

def request_fingerprint(self, request):
return request_fingerprint(request)

@classmethod
def from_spider(cls, spider):
settings = spider.settings
mongo_uri = settings.get("MongoFilter_URI")
mongo_db = settings.get("MongoFilter_DB")
dupefilter_key = settings.get("SCHEDULER_DUPEFILTER_KEY", defaults.SCHEDULER_DUPEFILTER_KEY)
collection = dupefilter_key % {"spider": spider.name}
debug = settings.getbool("DUPEFILTER_DEBUG")
return cls(mongo_uri=mongo_uri, db=mongo_db, collection=collection, debug=debug)

def close(self, reason=""):
self.clear()

def clear(self):
self.mongo.drop_collection(self.collection)

def log(self, request, spider):
if self.debug:
msg = "Filtered duplicate request: %(request)s"
self.logger.debug(msg, {"request": request}, extra={"spider": spider})
elif self.logdupes:
msg = (
"Filtered duplicate request %(request)s"
" - no more duplicates will be shown"
" (see DUPEFILTER_DEBUG to show all duplicates)"
)
self.logger.debug(msg, {"request": request}, extra={"spider": spider})
self.logdupes = False

spider.crawler.stats.inc_value("dupefilter/filtered", spider=spider)


# TODO: Rename class to RedisDupeFilter.
class RedisDupeFilter(BaseDupeFilter):
"""Redis-based request duplicates filter.
Expand Down Expand Up @@ -159,6 +230,8 @@ def log(self, request, spider):
self.logger.debug(msg, {"request": request}, extra={"spider": spider})
self.logdupes = False

spider.crawler.stats.inc_value("dupefilter/filtered", spider=spider)


class RedisBloomFilter(BaseDupeFilter):
"""Redis-based request duplicates filter.
Expand Down Expand Up @@ -195,7 +268,7 @@ def from_settings(cls, settings):
"""Returns an instance from given settings.

This uses by default the key ``dupefilter:<timestamp>``. When using the
``scrapy_redis_sentinel.scheduler.Scheduler`` class, this method is not used as
``mob_scrapy_redis_sentinel.scheduler.Scheduler`` class, this method is not used as
it needs to pass the spider name in the key.

Parameters
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

from . import connection, defaults

from mob_scrapy_redis_sentinel import mob_log
from mob_scrapy_redis_sentinel.utils import get_track_id


# TODO: add SCRAPY_JOB support.
class Scheduler(object):
Expand Down Expand Up @@ -33,16 +36,16 @@ class Scheduler(object):
"""

def __init__(
self,
server,
persist=False,
flush_on_start=False,
queue_key=defaults.SCHEDULER_QUEUE_KEY,
queue_cls=defaults.SCHEDULER_QUEUE_CLASS,
dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY,
dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS,
idle_before_close=0,
serializer=None
self,
server,
persist=False,
flush_on_start=False,
queue_key=defaults.SCHEDULER_QUEUE_KEY,
queue_cls=defaults.SCHEDULER_QUEUE_CLASS,
dupefilter_key=defaults.SCHEDULER_DUPEFILTER_KEY,
dupefilter_cls=defaults.SCHEDULER_DUPEFILTER_CLASS,
idle_before_close=0,
serializer=None
):
"""Initialize scheduler.

Expand Down Expand Up @@ -174,7 +177,9 @@ def enqueue_request(self, request):
def next_request(self):
block_pop_timeout = self.idle_before_close
request = self.queue.pop(block_pop_timeout)

if request and self.stats:
mob_log.info(f"get request from next_request: spider name: {self.spider.name}, {request.__dict__}").track_id(get_track_id(request)).commit()
self.stats.inc_value("scheduler/dequeued/redis", spider=self.spider)
return request

Expand Down
Loading