scrapy笔记-通用配置
1,调试scrapy
在工作中经常会用到调试功能, 下面是一种scrapy提供的方法, 代码如下:
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
...
if __main__ == "__main__":
process = CrawlerProcess(get_project_settings())
process.crawl("demo") # 你需要将此处的spider_name替换为你自己的爬虫名称
process.start()
2,读取数据库,获取起始链接(或构造起始链接)
有时候, scrapy start_urls 需要从数据库获取
def start_requests(self):
db = pymysql.connect(host=settings.MYSQL_HOST,
user=settings.MYSQL_USER,
password=settings.MYSQL_PASSWORD,
database=settings.MYSQL_DATABASE,
port=settings.MYSQL_PORT
)
with db.cursor() as cursor:
cursor.execute("select url from url_table")
result = cursor.fetchall()
db.close()
for url in result:
# yield scrapy.Request(f"http://www.baidu.com/s?wd={url[0]}") # 自己通过数据库返回的字段进行构造链接
yield scrapy.Request(url[0])
3,动态添加待爬url
同2, 将mysql部分去掉, 修改自己的就可以
4,常用设置, 自定义设置
有时候需要覆盖settings文件中的设置, 就在spider文件中写custom_settings, 然后在里面配置一些信息就可以了
class MySpider(scrapy.Spider):
...
custom_settings = {
"DEPTH_LIMIT": 5, # 爬取层速
"JOBDIR": "路径" # 在指定路径保存当前的进度,当中途退出爬虫后,再次运行时可以从这个文件中读取之前的进度,继续爬取.
"HTTPERROR_ALLOWED_CODES" : [302] # 302为暂时性的链接重置,当出现302时,应该继续深入爬取内容,所以我们不忽略302的报错,一般会设置为[301,302],301为永久性的链接重置.
"AUTOTHROTTLE_ENABLED" : True # 自动限速,可以参考网址: https://scrapy-chs.readthedocs.io/zh_CN/0.24/topics/autothrottle.html
}
更多配置请参考主页另一篇文章
5,常用pipeline
-
mysql pipeline
-
存在更新, 不存在插入
class MysqlUpdatePipeline: def __init__(self): self.database = settings.MYSQL_DATABASE self.port = settings.MYSQL_PORT self.password = settings.MYSQL_PASSWORD self.user = settings.MYSQL_USER self.host = settings.MYSQL_HOST self.cursor = None self.db = None def open_spider(self, spider): self.db = pymysql.connect(host=self.host, user=self.user, password=self.password, database=self.database, port=self.port ) self.cursor = self.db.cursor() def process_item(self, item, spider): data = dict(item) keys = ", ".join(data.keys()) values = ", ".join(["% s"] * len(data)) update = ", ".join( [f"{tuple(item.keys())[i]} = % s" for i in range(len(data))]) sql = "insert into % s (% s) values (% s) on duplicate key update % s" % ( item.table, keys, values, update) try: # 检查连接是否断开,如果断开就进行重连 self.db.ping(reconnect=True) print("mysql超时重连") # 使用 execute() 执行sql self.cursor.execute(sql, tuple(data.values()) * 2) # 提交事务 self.db.commit() except Exception as e: print("操作出现错误:{}".format(e)) # 回滚所有更改 self.db.rollback() return item def close_spider(self, spider): self.db.close()
-
直接插入, 操作跟更新差不多, 就将process_item中代码覆盖或者修改即可
class MysqlPipeline: ... def process_item(self, item, spider): data = dict(item) keys = ", ".join(data.keys()) values = ", ".join(["% s"] * len(data)) sql = "insert into % s (% s) values (% s) on duplicate key update % s" % ( item.table, keys, values) try: # 检查连接是否断开,如果断开就进行重连 self.db.ping(reconnect=True) self.cursor.execute(sql, data.values()) self.db.commit() except Exception as e: print("操作出现错误:{}".format(e)) self.db.rollback() return item
-
-
mongo pipeline
-
插入
class MongoPipeline: def __init__(self): self.client = pymongo.MongoClient(host=settings.MONGO_HOST, port=settings.MONGO_PORT) self.client.admin.authenticate(settings.MONGO_USER, settings.MONGO_PWD) self.db = self.client[settings.MONGO_DB] def process_item(self, item, spider): self.db[item.table].insert(dict(item)) # self.db[item.table].update_one({"id": item["id"]}, {"$set": item}, upsert=True) return item def close_spider(self, spider): self.client.close()
-
存在则更新, 不存在则插入, 其他代码同上
class MongoPipeline: def process_item(self, item, spider): # 根据item中唯一id进行判断, 实际运用中, 需要自己更改这个判断字段 self.db[item.table].update_one({"id": item["id"]}, {"$set": item}, upsert=True) return item
-
-
scrapy redis 主从分布式-master_pipeline
-
class MasterRedisPipeline: def __init__(self): self.redis_table = settings.REDIS_KEY # 选择表 self.redis_db = redis.Redis(host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=settings.REDIS_DB, password=settings.REDIS_PASSWORD) # redis数据库连接信息 def process_item(self, item, spider): # 存储字段自己可以根据自己需要单独设置, 这里设置字段名为url res = self.redis_db.lpush(self.redis_table, item["url"]) return item def close_spider(self, spider): self.redis_db.close()
-
-
图片下载中间件
-
需要继承ImagesPipeline
-
需要在settings中指明存储路径, IMAGES_STORE = “./images”
class ImagePipeline(ImagesPipeline): def file_path(self, request, response=None, info=None): url = request.url file_name = url.split("/")[-1] return file_name def item_completed(self, results, item, info): image_paths = [x["path"] for ok, x in results if ok] if not image_paths: raise DropItem("Image Downloaded Failed") return item def get_media_requests(self, item, info): # 根据自己需求设置url字段 yield Request(item["url"])
-