Python分布式爬虫打造搜索引擎
基于Scrapy、Redis、elasticsearch和django打造一个完整的搜索引擎网站
https://github.com/mtianyan/ArticleSpider
未来是什么时代?是数据时代!数据分析服务、互联网金融,数据建模、自然语言处理、医疗病例分析……越来越多的工作会基于数据来做,而爬虫正是快速获取数据最重要的方式,相比其它语言,Python爬虫更简单、高效
一、基础知识学习:
1. 爬取策略的深度优先和广度优先
目录:
网站url树结构分层设计:
- bogbole.com
- blog.bogbole.com
- python.bogbole.com
- python.bogbole.com/123
环路链接问题:
所以:我们需要对于链接进行去重
1. 深度优先
2. 广度优先
深度优先(递归实现):
顺着一条路,走到最深处。然后回头
广度优先(队列实现):
分层遍历:遍历完儿子辈。然后遍历孙子辈
Python实现深度优先过程code:
def depth_tree(tree_node):
if tree_node is not None:
print (tree_node._data)
if tree_node._left is not None:
return depth_tree(tree_node.left)
if tree_node._right is not None:
return depth_tree(tree_node,_right)
Python实现广度优先过程code:
def level_queue(root):
#利用队列实现树的广度优先遍历
if root is None:
return
my_queue = []
node = root
my_queue.append(node)
while my_queue:
node = my_queue.pop(0)
print (node.elem)
if node.lchild is not None:
my_queue.append(node.lchild)
if node.rchild is not None:
my_queue.append(node.rchild)
2. 爬虫网址去重策略
- 将访问过的url保存到数据库中
- 将url保存到set中。只需要O(1)的代价就可以查询到url
- url经过md5等方法哈希后保存到set中,将url压缩到固定长度而且不重复
- 用bitmap方法,将访问过的url通过hash函数映射到某一位
- bloomfilter方法对bitmap进行改进,多重hash函数降低冲突
scrapy去重使用的是第三种方法:后面分布式scrapy-redis会讲解bloomfilter方法。
3. Python字符串编码问题解决:
**读取文件,进行操作时转换为unicode编码进行处理** **保存文件时,转换为utf-8编码。以便于传输** 读文件的库会将转换为unicode *python2 默认编码格式为`ASCII`,Python3 默认编码为 `utf-8`*
#python3
import sys
sys.getdefaultencoding()
s.encoding('utf-8')
#python2
import sys
sys.getdefaultencoding()
s = "我和你"
su = u"我和你"
~~s.encode("utf-8")#会报错~~
s.decode("gb2312").encode("utf-8")
su.encode("utf-8")
二、伯乐在线爬取所有文章
1. 初始化文件目录
基础环境
为了便于日后的部署:我们开发使用了虚拟环境。
pip install virtualenv
pip install virtualenvwrapper-win
安装虚拟环境管理
mkvirtualenv articlespider3
创建虚拟环境
workon articlespider3
直接进入虚拟环境
deactivate
退出激活状态
workon
知道有哪些虚拟环境
scrapy项目初始化介绍
**命令行创建scrapy项目**
cd desktop
scrapy startproject ArticleSpider
**scrapy目录结构** scrapy借鉴了django的项目思想
SPIDER_MODULES = ['ArticleSpider.spiders'] #存放spider的路径
NEWSPIDER_MODULE = 'ArticleSpider.spiders'
pipelines.py:
middilewares.py:
__init__.py:
items.py:
**创建我们的spider**
cd ArticleSpider
scrapy genspider jobbole blog.jobbole.com
可以看到直接为我们创建好的空项目里已经有了模板代码。如下:
# -*- coding: utf-8 -*-
import scrapy
class JobboleSpider(scrapy.Spider):
name = "jobbole"
allowed_domains = ["blog.jobbole.com"]
# start_urls是一个带爬的列表,
#spider会为我们把请求下载网页做到,直接到parse阶段
start_urls = ['http://blog.jobbole.com/']
def parse(self, response):
pass
scray在命令行启动某一个Spyder的命令:
scrapy crawl jobbole
**在windows报出错误** `ImportError: No module named ‘win32api’`
pip install pypiwin32#解决
**创建我们的调试工具类*** 在项目根目录里创建main.py 作为调试工具文件
# _*_ coding: utf-8 _*_
__author__ = 'mtianyan'
__date__ = '2017/3/28 12:06'
from scrapy.cmdline import execute
import sys
import os
#将系统当前目录设置为项目根目录
#os.path.abspath(__file__)为当前文件所在绝对路径
#os.path.dirname为文件所在目录
#H:\CodePath\spider\ArticleSpider\main.py
#H:\CodePath\spider\ArticleSpider
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
#执行命令,相当于在控制台cmd输入改名了
execute(["scrapy", "crawl" , "jobbole"])
**settings.py的设置不遵守reboots协议** `ROBOTSTXT_OBEY = False` 在jobble.py打上断点:
def parse(self, response):
pass
可以看到他返回的htmlresponse对象: 对象内部:
可以看出scrapy已经为我们做到了将网页下载下来。而且编码也进行了转换.
2. 提取伯乐在线内容
xpath的使用
xpath让你可以不懂前端html,不看html的详细结构,只需要会右键查看就能获取网页上任何内容。速度远超beautifulsoup。 目录:
1. xpath简介
2. xpath术语与语法
3. xpath抓取误区:javasrcipt生成html与html源文件的区别
4. xpath抓取实例
为什么要使用xpath?
- xpath使用路径表达式在xml和html中进行导航
- xpath包含有一个标准函数库
- xpath是一个w3c的标准
- xpath速度要远远超beautifulsoup。
**xpath节点关系**
- 父节点
*上一层节点*
- 子节点
- 兄弟节点
*同胞节点*
- 先辈节点
*父节点,爷爷节点*
- 后代节点
*儿子,孙子*
xpath语法:
xpath语法-谓语:
xpath语法:
xpath抓取误区
取某一个网页上元素的xpath地址
在标题处右键使用firebugs查看元素。
然后在<h1>2016 腾讯软件开发面试题(部分)</h1>
右键查看xpath
# -*- coding: utf-8 -*-
import scrapy
class JobboleSpider(scrapy.Spider):
name = "jobbole"
allowed_domains = ["blog.jobbole.com"]
start_urls = ['http://blog.jobbole.com/110287/']
def parse(self, response):
re_selector = response.xpath("/html/body/div[3]/div[3]/div[1]/div[1]/h1")
# print(re_selector)
pass
调试debug可以看到
re_selector =(selectorlist)[]
可以看到返回的是一个空列表,
列表是为了如果我们当前的xpath路径下还有层级目录时可以进行选取
空说明没取到值:
我们可以来chorme里观察一下
chormexpath代码
# -*- coding: utf-8 -*-
import scrapy
class JobboleSpider(scrapy.Spider):
name = "jobbole"
allowed_domains = ["blog.jobbole.com"]
start_urls = ['http://blog.jobbole.com/110287/']
def parse(self, response):
re_selector = response.xpath('//*[@id="post-110287"]/div[1]/h1')
# print(re_selector)
pass
可以看出此时可以取到值
xpath可以有多种多样的写法:
re_selector = response.xpath("/html/body/div[1]/div[3]/div[1]/div[1]/h1/text()")
re2_selector = response.xpath('//*[@id="post-110287"]/div[1]/h1/text()')
re3_selector = response.xpath('//div[@class="entry-header]/h1/text()')
推荐使用id型。因为页面id唯一。
推荐使用class型,因为后期循环爬取可扩展通用性强。
通过了解了这些此时我们已经可以抓取到页面的标题,此时可以使用xpath利器照猫画虎抓取任何内容。只需要点击右键查看xpath。
开启控制台调试
scrapy shell http://blog.jobbole.com/110287/
完整的xpath提取伯乐在线字段代码
# -*- coding: utf-8 -*-
import scrapy
import re
class JobboleSpider(scrapy.Spider):
name = "jobbole"
allowed_domains = ["blog.jobbole.com"]
start_urls = ['http://blog.jobbole.com/110287/']
def parse(self, response):
#提取文章的具体字段
title = response.xpath('//div[@class="entry-header"]/h1/text()').extract_first("")
create_date = response.xpath("//p[@class='entry-meta-hide-on-mobile']/text()").extract()[0].strip().replace("·","").strip()
praise_nums = response.xpath("//span[contains(@class, 'vote-post-up')]/h10/text()").extract()[0]
fav_nums = response.xpath("//span[contains(@class, 'bookmark-btn')]/text()").extract()[0]
match_re = re.match(".*?(\d+).*", fav_nums)
if match_re:
fav_nums = match_re.group(1)
comment_nums = response.xpath("//a[@href='#article-comment']/span/text()").extract()[0]
match_re = re.match(".*?(\d+).*", comment_nums)
if match_re:
comment_nums = match_re.group(1)
content = response.xpath("//div[@class='entry']").extract()[0]
tag_list = response.xpath("//p[@class='entry-meta-hide-on-mobile']/a/text()").extract()
tag_list = [element for element in tag_list if not element.strip().endswith("评论")]
tags = ",".join(tag_list)
pass
css选择器的使用:
# 通过css选择器提取字段
# front_image_url = response.meta.get("front_image_url", "") #文章封面图
title = response.css(".entry-header h1::text").extract_first()
create_date = response.css("p.entry-meta-hide-on-mobile::text").extract()[0].strip().replace("·","").strip()
praise_nums = response.css(".vote-post-up h10::text").extract()[0]
fav_nums = response.css(".bookmark-btn::text").extract()[0]
match_re = re.match(".*?(\d+).*", fav_nums)
if match_re:
fav_nums = int(match_re.group(1))
else:
fav_nums = 0
comment_nums = response.css("a[href='#article-comment'] span::text").extract()[0]
match_re = re.match(".*?(\d+).*", comment_nums)
if match_re:
comment_nums = int(match_re.group(1))
else:
comment_nums = 0
content = response.css("div.entry").extract()[0]
tag_list = response.css("p.entry-meta-hide-on-mobile a::text").extract()
tag_list = [element for element in tag_list if not element.strip().endswith("评论")]
tags = ",".join(tag_list)
pass
3. 爬取所有文章
yield关键字
#使用request下载详情页面,下载完成后回调方法parse_detail()提取文章内容中的字段
yield Request(url=parse.urljoin(response.url,post_url),callback=self.parse_detail)
scrapy.http import Request下载网页
from scrapy.http import Request
Request(url=parse.urljoin(response.url,post_url),callback=self.parse_detail)
parse拼接网址应对herf内有可能网址不全
from urllib import parse
url=parse.urljoin(response.url,post_url)
parse.urljoin("http://blog.jobbole.com/all-posts/","http://blog.jobbole.com/111535/")
#结果为http://blog.jobbole.com/111535/
class层级关系
next_url = response.css(".next.page-numbers::attr(href)").extract_first("")
#如果.next .pagenumber 是指两个class为层级关系。而不加空格为同一个标签
twist异步机制
Scrapy使用了Twisted作为框架,Twisted有些特殊的地方是它是事件驱动的,并且比较适合异步的代码。在任何情况下,都不要写阻塞的代码。阻塞的代码包括:
- 访问文件、数据库或者Web
- 产生新的进程并需要处理新进程的输出,如运行shell命令
- 执行系统层次操作的代码,如等待系统队列
实现全部文章字段下载的代码:
def parse(self, response):
"""
1. 获取文章列表页中的文章url并交给scrapy下载后并进行解析
2. 获取下一页的url并交给scrapy进行下载, 下载完成后交给parse
"""
# 解析列表页中的所有文章url并交给scrapy下载后并进行解析
post_urls = response.css("#archive .floated-thumb .post-thumb a::attr(href)").extract()
for post_url in post_urls:
#request下载完成之后,回调parse_detail进行文章详情页的解析
# Request(url=post_url,callback=self.parse_detail)
print(response.url)
print(post_url)
yield Request(url=parse.urljoin(response.url,post_url),callback=self.parse_detail)
#遇到href没有域名的解决方案
#response.url + post_url
print(post_url)
# 提取下一页并交给scrapy进行下载
next_url = response.css(".next.page-numbers::attr(href)").extract_first("")
if next_url:
yield Request(url=parse.urljoin(response.url, post_url), callback=self.parse)
全部文章的逻辑流程图
4. scrapy的items整合字段
数据爬取的任务就是从非结构的数据中提取出结构性的数据。
items 可以让我们自定义自己的字段(类似于字典,但比字典的功能更齐全)
在当前页,需要提取多个url
原始写法,extract之后则生成list列表,无法进行二次筛选:
post_urls = response.css("#archive .floated-thumb .post-thumb a::attr(href)").extract()
改进写法:
post_nodes = response.css("#archive .floated-thumb .post-thumb a")
for post_node in post_nodes:
#获取封面图的url
image_url = post_node.css("img::attr(src)").extract_first("")
post_url = post_node.css("::attr(href)").extract_first("")
在下载网页的时候把获取到的封面图的url传给parse_detail的response
在下载网页时将这个封面url获取到,并通过meta将他发送出去。在callback的回调函数中接收该值
yield Request(url=parse.urljoin(response.url,post_url),meta={"front_image_url":image_url},callback=self.parse_detail)
front_image_url = response.meta.get("front_image_url", "")
urljoin的好处
如果你没有域名,我就从response里取出来,如果你有域名则我对你起不了作用了
**编写我们自定义的item并在jobboled.py中填充。
class JobBoleArticleItem(scrapy.Item):
title = scrapy.Field()
create_date = scrapy.Field()
url = scrapy.Field()
url_object_id = scrapy.Field()
front_image_url = scrapy.Field()
front_image_path = scrapy.Field()
praise_nums = scrapy.Field()
comment_nums = scrapy.Field()
fav_nums = scrapy.Field()
content = scrapy.Field()
tags = scrapy.Field()
import之后实例化,实例化之后填充:
1. from ArticleSpider.items import JobBoleArticleItem
2. article_item = JobBoleArticleItem()
3. article_item["title"] = title
article_item["url"] = response.url
article_item["create_date"] = create_date
article_item["front_image_url"] = [front_image_url]
article_item["praise_nums"] = praise_nums
article_item["comment_nums"] = comment_nums
article_item["fav_nums"] = fav_nums
article_item["tags"] = tags
article_item["content"] = content
yield article_item将这个item传送到pipelines中
pipelines可以接收到传送过来的item
将setting.py中的pipeline配置取消注释
# Configure item pipelines
# See http://scrapy.readthedocs.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {
'ArticleSpider.pipelines.ArticlespiderPipeline': 300,
}
当我们的item被传输到pipeline我们可以将其进行存储到数据库等工作
setting设置下载图片pipeline
ITEM_PIPELINES={
'scrapy.pipelines.images.ImagesPipeline': 1,
}
H:\CodePath\pyEnvs\articlespider3\Lib\site-packages\scrapy\pipelines
里面有三个scrapy默认提供的pipeline
提供了文件,图片,媒体。
ITEM_PIPELINES是一个数据管道的登记表,每一项具体的数字代表它的优先级,数字越小,越早进入。
setting设置下载图片的地址
# IMAGES_MIN_HEIGHT = 100
# IMAGES_MIN_WIDTH = 100
设置下载图片的最小高度,宽度。
新建文件夹images在
IMAGES_URLS_FIELD = "front_image_url"
project_dir = os.path.abspath(os.path.dirname(__file__))
IMAGES_STORE = os.path.join(project_dir, 'images')
安装PILpip install pillow
定制自己的pipeline使其下载图片后能保存下它的本地路径
get_media_requests()接收一个迭代器对象下载图片
item_completed获取到图片的下载地址
继承并重写item_completed()
from scrapy.pipelines.images import ImagesPipeline
class ArticleImagePipeline(ImagesPipeline):
#重写该方法可从result中获取到图片的实际下载地址
def item_completed(self, results, item, info):
for ok, value in results:
image_file_path = value["path"]
item["front_image_path"] = image_file_path
return item
setting中设置使用我们自定义的pipeline,而不是系统自带的
ITEM_PIPELINES = {
'ArticleSpider.pipelines.ArticlespiderPipeline': 300,
# 'scrapy.pipelines.images.ImagesPipeline': 1,
'ArticleSpider.pipelines.ArticleImagePipeline':1,
}
图片url的md5处理
新建package utils
import hashlib
def get_md5(url):
m = hashlib.md5()
m.update(url)
return m.hexdigest()
if __name__ == "__main__":
print(get_md5("http://jobbole.com".encode("utf-8")))
不确定用户传入的是不是:
def get_md5(url):
#str就是unicode了
if isinstance(url, str):
url = url.encode("utf-8")
m = hashlib.md5()
m.update(url)
return m.hexdigest()
在jobbole.py中将url的md5保存下来
from ArticleSpider.utils.common import get_md5
article_item["url_object_id"] = get_md5(response.url)
5. 数据保存到本地文件以及mysql中
保存到本地json文件
import codecs打开文件避免一些编码问题,自定义JsonWithEncodingPipeline实现json本地保存
class JsonWithEncodingPipeline(object):
#自定义json文件的导出
def __init__(self):
self.file = codecs.open('article.json', 'w', encoding="utf-8")
def process_item(self, item, spider):
#将item转换为dict,然后生成json对象,false避免中文出错
lines = json.dumps(dict(item), ensure_ascii=False) + "\n"
self.file.write(lines)
return item
#当spider关闭的时候
def spider_closed(self, spider):
self.file.close()
setting.py注册pipeline
ITEM_PIPELINES = {
'ArticleSpider.pipelines.JsonWithEncodingPipeline': 2,
# 'scrapy.pipelines.images.ImagesPipeline': 1,
'ArticleSpider.pipelines.ArticleImagePipeline':1,
}
scrapy exporters JsonItemExporter导出
scrapy自带的导出:
- 'CsvItemExporter',
- 'XmlItemExporter',
- 'JsonItemExporter'
from scrapy.exporters import JsonItemExporter
class JsonExporterPipleline(object):
#调用scrapy提供的json export导出json文件
def __init__(self):
self.file = open('articleexport.json', 'wb')
self.exporter = JsonItemExporter(self.file, encoding="utf-8", ensure_ascii=False)
self.exporter.start_exporting()
def close_spider(self, spider):
self.exporter.finish_exporting()
self.file.close()
def process_item(self, item, spider):
self.exporter.export_item(item)
return item
设置setting.py注册该pipeline
'ArticleSpider.pipelines.JsonExporterPipleline ': 2
- 1
保存到数据库(mysql)
数据库设计数据表,表的内容字段是和item一致的。数据库与item的关系。类似于django中model与form的关系。
日期的转换,将字符串转换为datetime
import datetime
try:
create_date = datetime.datetime.strptime(create_date, "%Y/%m/%d").date()
except Exception as e:
create_date = datetime.datetime.now().date()
数据库表设计
- 三个num字段均设置不能为空,然后默认0.
- content设置为longtext
- 主键设置为url_object_id
数据库驱动安装pip install mysqlclient
Linux报错解决方案:
ubuntu:sudo apt-get install libmysqlclient-dev
centos:sudo yum install python-devel mysql-devel
保存到数据库pipeline(同步)编写
import MySQLdb
class MysqlPipeline(object):
#采用同步的机制写入mysql
def __init__(self):
self.conn = MySQLdb.connect('127.0.0.1', 'root', 'password', 'article_spider', charset="utf8", use_unicode=True)
self.cursor = self.conn.cursor()
def process_item(self, item, spider):
insert_sql = """
insert into jobbole_article(title, url, create_date, fav_nums)
VALUES (%s, %s, %s, %s)
"""
self.cursor.execute(insert_sql, (item["title"], item["url"], item["create_date"], item["fav_nums"]))
self.conn.commit()
保存到数据库的(异步Twisted)编写
因为我们的爬取速度可能大于数据库存储的速度。异步操作。
设置可配置参数
seeting.py设置
MYSQL_HOST = "127.0.0.1"
MYSQL_DBNAME = "article_spider"
MYSQL_USER = "root"
MYSQL_PASSWORD = "123456"
代码中获取到设置的可配置参数
twisted异步:
import MySQLdb.cursors
from twisted.enterprise import adbapi
#连接池ConnectionPool
# def __init__(self, dbapiName, *connargs, **connkw):
class MysqlTwistedPipline(object):
def __init__(self, dbpool):
self.dbpool = dbpool
@classmethod
def from_settings(cls, settings):
dbparms = dict(
host = settings["MYSQL_HOST"],
db = settings["MYSQL_DBNAME"],
user = settings["MYSQL_USER"],
passwd = settings["MYSQL_PASSWORD"],
charset='utf8',
cursorclass=MySQLdb.cursors.DictCursor,
use_unicode=True,
)
#**dbparms-->("MySQLdb",host=settings['MYSQL_HOST']
dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)
return cls(dbpool)
def process_item(self, item, spider):
#使用twisted将mysql插入变成异步执行
query = self.dbpool.runInteraction(self.do_insert, item)
query.addErrback(self.handle_error, item, spider) #处理异常
def handle_error(self, failure, item, spider):
#处理异步插入的异常
print (failure)
def do_insert(self, cursor, item):
#执行具体的插入
#根据不同的item 构建不同的sql语句并插入到mysql中
insert_sql, params = item.get_insert_sql()
cursor.execute(insert_sql, params)
可选django.items
https://github.com/scrapy-plugins/scrapy-djangoitem
可以让我们保存的item直接变成django的models.
scrapy的itemloader来维护提取代码
itemloadr提供了一个容器,让我们配置某一个字段该使用哪种规则。
add_css add_value add_xpath
from scrapy.loader import ItemLoader
# 通过item loader加载item
front_image_url = response.meta.get("front_image_url", "") # 文章封面图
item_loader = ItemLoader(item=JobBoleArticleItem(), response=response)
item_loader.add_css("title", ".entry-header h1::text")
item_loader.add_value("url", response.url)
item_loader.add_value("url_object_id", get_md5(response.url))
item_loader.add_css("create_date", "p.entry-meta-hide-on-mobile::text")
item_loader.add_value("front_image_url", [front_image_url])
item_loader.add_css("praise_nums", ".vote-post-up h10::text")
item_loader.add_css("comment_nums", "a[href='#article-comment'] span::text")
item_loader.add_css("fav_nums", ".bookmark-btn::text")
item_loader.add_css("tags", "p.entry-meta-hide-on-mobile a::text")
item_loader.add_css("content", "div.entry")
#调用这个方法来对规则进行解析生成item对象
article_item = item_loader.load_item()
- 所有值变成了list
- 对于这些值做一些处理函数
item.py中对于item process处理函数
MapCompose可以传入函数对于该字段进行处理,而且可以传入多个
from scrapy.loader.processors import MapCompose
def add_mtianyan(value):
return value+"-mtianyan"
title = scrapy.Field(
input_processor=MapCompose(lambda x:x+"mtianyan",add_mtianyan),
)
注意:此处的自定义方法一定要写在代码前面。
create_date = scrapy.Field(
input_processor=MapCompose(date_convert),
output_processor=TakeFirst()
)
只取list中的第一个值。
自定义itemloader实现默认提取第一个
class ArticleItemLoader(ItemLoader):
#自定义itemloader实现默认提取第一个
default_output_processor = TakeFirst()
list保存原值
def return_value(value):
return value
front_image_url = scrapy.Field(
output_processor=MapCompose(return_value)
)
下载图片pipeline增加if增强通用性
class ArticleImagePipeline(ImagesPipeline):
#重写该方法可从result中获取到图片的实际下载地址
def item_completed(self, results, item, info):
if "front_image_url" in item:
for ok, value in results:
image_file_path = value["path"]
item["front_image_path"] = image_file_path
return item
自定义的item带处理函数的完整代码
class JobBoleArticleItem(scrapy.Item):
title = scrapy.Field()
create_date = scrapy.Field(
input_processor=MapCompose(date_convert),
)
url = scrapy.Field()
url_object_id = scrapy.Field()
front_image_url = scrapy.Field(
output_processor=MapCompose(return_value)
)
front_image_path = scrapy.Field()
praise_nums = scrapy.Field(
input_processor=MapCompose(get_nums)
)
comment_nums = scrapy.Field(
input_processor=MapCompose(get_nums)
)
fav_nums = scrapy.Field(
input_processor=MapCompose(get_nums)
)
#因为tag本身是list,所以要重写
tags = scrapy.Field(
input_processor=MapCompose(remove_comment_tags),
output_processor=Join(",")
)
content = scrapy.Field()
三、知乎网问题和答案爬取
1. 基础知识
session和cookie机制
session的工作原理
(1)当一个session第一次被启用时,一个唯一的标识被存储于本地的cookie中。
(2)首先使用session_start()函数,从session仓库中加载已经存储的session变量。
(3)通过使用session_register()函数注册session变量。
(4)脚本执行结束时,未被销毁的session变量会被自动保存在本地一定路径下的session库中.
request模拟知乎的登录
http状态码
获取crsftoken
def get_xsrf():
#获取xsrf code
response = requests.get("https://www.zhihu.com",headers =header)
# # print(response.text)
# text ='<input type="hidden" name="_xsrf" value="ca70366e5de5d133c3ae09fb16d9b0fa"/>'
match_obj = re.match('.*name="_xsrf" value="(.*?)"', response.text)
if match_obj:
return (match_obj.group(1))
else:
return ""
python模拟知乎登录代码:
# _*_ coding: utf-8 _*_
import requests
try:
import cookielib
except:
import http.cookiejar as cookielib
import re
__author__ = 'mtianyan'
__date__ = '2017/5/23 16:42'
import requests
try:
import cookielib
except:
import http.cookiejar as cookielib
import re
session = requests.session()
session.cookies = cookielib.LWPCookieJar(filename="cookies.txt")
try:
session.cookies.load(ignore_discard=True)
except:
print ("cookie未能加载")
agent = "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36"
header = {
"HOST":"www.zhihu.com",
"Referer": "https://www.zhizhu.com",
'User-Agent': agent
}
def is_login():
#通过个人中心页面返回状态码来判断是否为登录状态
inbox_url = "https://www.zhihu.com/question/56250357/answer/148534773"
response = session.get(inbox_url, headers=header, allow_redirects=False)
if response.status_code != 200:
return False
else:
return True
def get_xsrf():
#获取xsrf code
response = session.get("https://www.zhihu.com", headers=header)
response_text = response.text
#reDOTAll 匹配全文
match_obj = re.match('.*name="_xsrf" value="(.*?)"', response_text, re.DOTALL)
xsrf = ''
if match_obj:
xsrf = (match_obj.group(1))
return xsrf
def get_index():
response = session.get("https://www.zhihu.com", headers=header)
with open("index_page.html", "wb") as f:
f.write(response.text.encode("utf-8"))
print ("ok")
def get_captcha():
import time
t = str(int(time.time()*1000))
captcha_url = "https://www.zhihu.com/captcha.gif?r={0}&type=login".format(t)
t = session.get(captcha_url, headers=header)
with open("captcha.jpg","wb") as f:
f.write(t.content)
f.close()
from PIL import Image
try:
im = Image.open('captcha.jpg')
im.show()
im.close()
except:
pass
captcha = input("输入验证码\n>")
return captcha
def zhihu_login(account, password):
#知乎登录
if re.match("^1\d{10}",account):
print ("手机号码登录")
post_url = "https://www.zhihu.com/login/phone_num"
post_data = {
"_xsrf": get_xsrf(),
"phone_num": account,
"password": password,
"captcha":get_captcha()
}
else:
if "@" in account:
#判断用户名是否为邮箱
print("邮箱方式登录")
post_url = "https://www.zhihu.com/login/email"
post_data = {
"_xsrf": get_xsrf(),
"email": account,
"password": password
}
response_text = session.post(post_url, data=post_data, headers=header)
session.cookies.save()
# get_index()
# is_login()
# get_captcha()
zhihu_login("phone", "password")
zhihu_login("shouji", "mima")
2. scrapy创建知乎爬虫登录
scrapy genspider zhihu www.zhihu.com
- 1
因为知乎我们需要先进行登录,所以我们重写它的start_requests
def start_requests(self):
return [scrapy.Request('https://www.zhihu.com/#signin', headers=self.headers, callback=self.login)]
下载首页然后回调login函数。
login函数请求验证码并回调login_after_captcha函数.此处通过meta将post_data传送出去,后面的回调函数来用。
def login(self, response):
response_text = response.text
#获取xsrf。
match_obj = re.match('.*name="_xsrf" value="(.*?)"', response_text, re.DOTALL)
xsrf = ''
if match_obj:
xsrf = (match_obj.group(1))
if xsrf:
post_url = "https://www.zhihu.com/login/phone_num"
post_data = {
"_xsrf": xsrf,
"phone_num": "phone",
"password": "password",
"captcha": ""
}
import time
t = str(int(time.time() * 1000))
captcha_url = "https://www.zhihu.com/captcha.gif?r={0}&type=login".format(t)
#请求验证码并回调login_after_captcha.
yield scrapy.Request(captcha_url, headers=self.headers,
meta={"post_data":post_data}, callback=self.login_after_captcha)
- login_after_captcha函数将验证码图片保存到本地,然后使用PIL库打开图片,肉眼识别后在控制台输入验证码值
然后接受步骤一的meta数据,一并提交至登录接口。回调check_login检查是否登录成功。
def login_after_captcha(self, response):
with open("captcha.jpg", "wb") as f:
f.write(response.body)
f.close()
from PIL import Image
try:
im = Image.open('captcha.jpg')
im.show()
im.close()
except:
pass
captcha = input("输入验证码\n>")
post_data = response.meta.get("post_data", {})
post_url = "https://www.zhihu.com/login/phone_num"
post_data["captcha"] = captcha
return [scrapy.FormRequest(
url=post_url,
formdata=post_data,
headers=self.headers,
callback=self.check_login
)]
- check_login函数,验证服务器的返回数据判断是否成功
scrapy会对request的URL去重(RFPDupeFilter),加上dont_filter则告诉它这个URL不参与去重.
源码中的startrequest:
def start_requests(self):
for url in self.start_urls:
yield self.make_requests_from_url(url)
我们将原本的start_request的代码放在了现在重写的,回调链最后的check_login
def check_login(self, response):
#验证服务器的返回数据判断是否成功
text_json = json.loads(response.text)
if "msg" in text_json and text_json["msg"] == "登录成功":
for url in self.start_urls:
yield scrapy.Request(url, dont_filter=True, headers=self.headers)
3. 知乎数据表设计
设置数据表字段
知乎url分析
点具体问题下查看更多。
可获得接口:
重点参数:offset=43
isend = true
next
href=”/question/25460323”
all_urls = [parse.urljoin(response.url, url) for url in all_urls]
- 1
- 从首页获取所有a标签。如果提取的url中格式为 /question/xxx 就下载之后直接进入解析函数parse_question
如果不是question页面则直接进一步跟踪。
def parse(self, response):
"""
提取出html页面中的所有url 并跟踪这些url进行一步爬取
如果提取的url中格式为 /question/xxx 就下载之后直接进入解析函数
"""
all_urls = response.css("a::attr(href)").extract()
all_urls = [parse.urljoin(response.url, url) for url in all_urls]
#使用lambda函数对于每一个url进行过滤,如果是true放回列表,返回false去除。
all_urls = filter(lambda x:True if x.startswith("https") else False, all_urls)
for url in all_urls:
match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", url)
if match_obj:
# 如果提取到question相关的页面则下载后交由提取函数进行提取
request_url = match_obj.group(1)
yield scrapy.Request(request_url, headers=self.headers, callback=self.parse_question)
else:
# 如果不是question页面则直接进一步跟踪
yield scrapy.Request(url, headers=self.headers, callback=self.parse)
- 进入parse_question函数处理
**创建我们的item
item要用到的方法ArticleSpider\utils\common.py:
def extract_num(text):
#从字符串中提取出数字
match_re = re.match(".*?(\d+).*", text)
if match_re:
nums = int(match_re.group(1))
else:
nums = 0
return nums
setting.py中设置SQL_DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
SQL_DATE_FORMAT = "%Y-%m-%d"
使用:
from ArticleSpider.settings import SQL_DATETIME_FORMAT
- 1
知乎的问题 item
class ZhihuQuestionItem(scrapy.Item):
#知乎的问题 item
zhihu_id = scrapy.Field()
topics = scrapy.Field()
url = scrapy.Field()
title = scrapy.Field()
content = scrapy.Field()
answer_num = scrapy.Field()
comments_num = scrapy.Field()
watch_user_num = scrapy.Field()
click_num = scrapy.Field()
crawl_time = scrapy.Field()
def get_insert_sql(self):
#插入知乎question表的sql语句
insert_sql = """
insert into zhihu_question(zhihu_id, topics, url, title, content, answer_num, comments_num,
watch_user_num, click_num, crawl_time
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE content=VALUES(content), answer_num=VALUES(answer_num), comments_num=VALUES(comments_num),
watch_user_num=VALUES(watch_user_num), click_num=VALUES(click_num)
"""
zhihu_id = self["zhihu_id"][0]
topics = ",".join(self["topics"])
url = self["url"][0]
title = "".join(self["title"])
content = "".join(self["content"])
answer_num = extract_num("".join(self["answer_num"]))
comments_num = extract_num("".join(self["comments_num"]))
if len(self["watch_user_num"]) == 2:
watch_user_num = int(self["watch_user_num"][0])
click_num = int(self["watch_user_num"][1])
else:
watch_user_num = int(self["watch_user_num"][0])
click_num = 0
crawl_time = datetime.datetime.now().strftime(SQL_DATETIME_FORMAT)
params = (zhihu_id, topics, url, title, content, answer_num, comments_num,
watch_user_num, click_num, crawl_time)
return insert_sql, params
知乎问题回答item
class ZhihuAnswerItem(scrapy.Item):
#知乎的问题回答item
zhihu_id = scrapy.Field()
url = scrapy.Field()
question_id = scrapy.Field()
author_id = scrapy.Field()
content = scrapy.Field()
parise_num = scrapy.Field()
comments_num = scrapy.Field()
create_time = scrapy.Field()
update_time = scrapy.Field()
crawl_time = scrapy.Field()
def get_insert_sql(self):
#插入知乎question表的sql语句
insert_sql = """
insert into zhihu_answer(zhihu_id, url, question_id, author_id, content, parise_num, comments_num,
create_time, update_time, crawl_time
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE content=VALUES(content), comments_num=VALUES(comments_num), parise_num=VALUES(parise_num),
update_time=VALUES(update_time)
"""
create_time = datetime.datetime.fromtimestamp(self["create_time"]).strftime(SQL_DATETIME_FORMAT)
update_time = datetime.datetime.fromtimestamp(self["update_time"]).strftime(SQL_DATETIME_FORMAT)
params = (
self["zhihu_id"], self["url"], self["question_id"],
self["author_id"], self["content"], self["parise_num"],
self["comments_num"], create_time, update_time,
self["crawl_time"].strftime(SQL_DATETIME_FORMAT),
)
return insert_sql, params
有了两个item之后,我们继续完善我们的逻辑
def parse_question(self, response):
#处理question页面, 从页面中提取出具体的question item
if "QuestionHeader-title" in response.text:
#处理新版本
match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", response.url)
if match_obj:
question_id = int(match_obj.group(2))
item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response)
item_loader.add_css("title", "h1.QuestionHeader-title::text")
item_loader.add_css("content", ".QuestionHeader-detail")
item_loader.add_value("url", response.url)
item_loader.add_value("zhihu_id", question_id)
item_loader.add_css("answer_num", ".List-headerText span::text")
item_loader.add_css("comments_num", ".QuestionHeader-actions button::text")
item_loader.add_css("watch_user_num", ".NumberBoard-value::text")
item_loader.add_css("topics", ".QuestionHeader-topics .Popover div::text")
question_item = item_loader.load_item()
else:
#处理老版本页面的item提取
match_obj = re.match("(.*zhihu.com/question/(\d+))(/|$).*", response.url)
if match_obj:
question_id = int(match_obj.group(2))
item_loader = ItemLoader(item=ZhihuQuestionItem(), response=response)
# item_loader.add_css("title", ".zh-question-title h2 a::text")
item_loader.add_xpath("title", "//*[@id='zh-question-title']/h2/a/text()|//*[@id='zh-question-title']/h2/span/text()")
item_loader.add_css("content", "#zh-question-detail")
item_loader.add_value("url", response.url)
item_loader.add_value("zhihu_id", question_id)
item_loader.add_css("answer_num", "#zh-question-answer-num::text")
item_loader.add_css("comments_num", "#zh-question-meta-wrap a[name='addcomment']::text")
# item_loader.add_css("watch_user_num", "#zh-question-side-header-wrap::text")
item_loader.add_xpath("watch_user_num", "//*[@id='zh-question-side-header-wrap']/text()|//*[@class='zh-question-followers-sidebar']/div/a/strong/text()")
item_loader.add_css("topics", ".zm-tag-editor-labels a::text")
question_item = item_loader.load_item()
yield scrapy.Request(self.start_answer_url.format(question_id, 20, 0), headers=self.headers, callback=self.parse_answer)
yield question_item
处理问题回答提取出需要的字段
def parse_answer(self, reponse):
#处理question的answer
ans_json = json.loads(reponse.text)
is_end = ans_json["paging"]["is_end"]
next_url = ans_json["paging"]["next"]
#提取answer的具体字段
for answer in ans_json["data"]:
answer_item = ZhihuAnswerItem()
answer_item["zhihu_id"] = answer["id"]
answer_item["url"] = answer["url"]
answer_item["question_id"] = answer["question"]["id"]
answer_item["author_id"] = answer["author"]["id"] if "id" in answer["author"] else None
answer_item["content"] = answer["content"] if "content" in answer else None
answer_item["parise_num"] = answer["voteup_count"]
answer_item["comments_num"] = answer["comment_count"]
answer_item["create_time"] = answer["created_time"]
answer_item["update_time"] = answer["updated_time"]
answer_item["crawl_time"] = datetime.datetime.now()
yield answer_item
if not is_end:
yield scrapy.Request(next_url, headers=self.headers, callback=self.parse_answer)
知乎提取字段流程图:
深度优先:
1. 提取出页面所有的url,并过滤掉不需要的url
2. 如果是questionurl就进入question的解析
3. 把该问题的爬取完了然后就返回初始解析
将item写入数据库
pipelines.py错误处理
插入时错误可通过该方法监控
def handle_error(self, failure, item, spider):
#处理异步插入的异常
print (failure)
改造pipeline使其变得更通用
原本具体硬编码的pipeline
def do_insert(self, cursor, item):
#执行具体的插入
insert_sql = """
insert into jobbole_article(title, url, create_date, fav_nums)
VALUES (%s, %s, %s, %s)
"""
cursor.execute(insert_sql, (item["title"], item["url"], item["create_date"], item["fav_nums"]))
改写后的:
def do_insert(self, cursor, item):
#根据不同的item 构建不同的sql语句并插入到mysql中
insert_sql, params = item.get_insert_sql()
cursor.execute(insert_sql, params)
可选方法一:
if item.__class__.__name__ == "JobBoleArticleItem":
#执行具体的插入
insert_sql = """
insert into jobbole_article(title, url, create_date, fav_nums)
VALUES (%s, %s, %s, %s)
"""
cursor.execute(insert_sql, (item["title"], item["url"], item["create_date"], item["fav_nums"]))
推荐方法:
把sql语句等放到item里面:
jobboleitem类内部方法
def get_insert_sql(self):
insert_sql = """
insert into jobbole_article(title, url, create_date, fav_nums)
VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE content=VALUES(fav_nums)
"""
params = (self["title"], self["url"], self["create_date"], self["fav_nums"])
return insert_sql, params
知乎问题:
def get_insert_sql(self):
#插入知乎question表的sql语句
insert_sql = """
insert into zhihu_question(zhihu_id, topics, url, title, content, answer_num, comments_num,
watch_user_num, click_num, crawl_time
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE content=VALUES(content), answer_num=VALUES(answer_num), comments_num=VALUES(comments_num),
watch_user_num=VALUES(watch_user_num), click_num=VALUES(click_num)
"""
zhihu_id = self["zhihu_id"][0]
topics = ",".join(self["topics"])
url = self["url"][0]
title = "".join(self["title"])
content = "".join(self["content"])
answer_num = extract_num("".join(self["answer_num"]))
comments_num = extract_num("".join(self["comments_num"]))
if len(self["watch_user_num"]) == 2:
watch_user_num = int(self["watch_user_num"][0])
click_num = int(self["watch_user_num"][1])
else:
watch_user_num = int(self["watch_user_num"][0])
click_num = 0
crawl_time = datetime.datetime.now().strftime(SQL_DATETIME_FORMAT)
params = (zhihu_id, topics, url, title, content, answer_num, comments_num,
watch_user_num, click_num, crawl_time)
return insert_sql, params
知乎回答:
def get_insert_sql(self):
#插入知乎回答表的sql语句
insert_sql = """
insert into zhihu_answer(zhihu_id, url, question_id, author_id, content, parise_num, comments_num,
create_time, update_time, crawl_time
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE content=VALUES(content), comments_num=VALUES(comments_num), parise_num=VALUES(parise_num),
update_time=VALUES(update_time)
"""
create_time = datetime.datetime.fromtimestamp(self["create_time"]).strftime(SQL_DATETIME_FORMAT)
update_time = datetime.datetime.fromtimestamp(self["update_time"]).strftime(SQL_DATETIME_FORMAT)
params = (
self["zhihu_id"], self["url"], self["question_id"],
self["author_id"], self["content"], self["parise_num"],
self["comments_num"], create_time, update_time,
self["crawl_time"].strftime(SQL_DATETIME_FORMAT),
)
return insert_sql, params
第二次爬取到相同数据,更新数据
ON DUPLICATE KEY UPDATE content=VALUES(content), answer_num=VALUES(answer_num), comments_num=VALUES(comments_num),
watch_user_num=VALUES(watch_user_num), click_num=VALUES(click_num)
调试技巧
if match_obj:
#如果提取到question相关的页面则下载后交由提取函数进行提取
request_url = match_obj.group(1)
yield scrapy.Request(request_url, headers=self.headers, callback=self.parse_question)
#方便调试
break
else:
#方便调试
pass
#如果不是question页面则直接进一步跟踪
#方便调试
# yield scrapy.Request(url, headers=self.headers, callback=self.parse)
#方便调试
# yield question_item
错误排查
[key error] title
pipeline中debug定位到哪一个item的错误。
四、通过CrawlSpider对招聘网站拉钩网进行整站爬取
推荐工具cmder
http://cmder.net/
下载full版本,使我们在windows环境下也可以使用linux部分命令。
配置path环境变量
1. 设计拉勾网的数据表结构
2. 初始化拉钩网项目并解读crawl源码
scrapy genspider --list
查看可使用的初始化模板
ailable templates:
- basic
- crawl
- csvfeed
- xmlfeed
scrapy genspider -t crawl lagou www.lagou.com
cmd与pycharm不同,mark root
setting.py 设置目录
crawl模板
class LagouSpider(CrawlSpider):
name = 'lagou'
allowed_domains = ['www.lagou.com']
start_urls = ['http://www.lagou.com/']
rules = (
Rule(LinkExtractor(allow=r'Items/'), callback='parse_item', follow=True),
)
def parse_item(self, response):
i = {}
#i['domain_id'] = response.xpath('//input[@id="sid"]/@value').extract()
#i['name'] = response.xpath('//div[@id="name"]').extract()
#i['description'] = response.xpath('//div[@id="description"]').extract()
return i
源码阅读剖析
https://doc.scrapy.org/en/1.3/topics/spiders.html#crawlspider
提供了一些可以让我们进行简单的follow的规则,link,迭代爬取
rules:
parse_start_url(response):
example:
rules是一个可迭代对象,里面有Rule实例->LinkExtractor的分析allow=('category\.php', ), callback='parse_item',
allow允许的url模式。callback,要回调的函数名。
因为rules里面没有self,无法获取到方法。
import scrapy
from scrapy.spiders import CrawlSpider, Rule
from scrapy.linkextractors import LinkExtractor
class MySpider(CrawlSpider):
name = 'example.com'
allowed_domains = ['example.com']
start_urls = ['http://www.example.com']
rules = (
# Extract links matching 'category.php' (but not matching 'subsection.php')
# and follow links from them (since no callback means follow=True by default).
Rule(LinkExtractor(allow=('category\.php', ), deny=('subsection\.php', ))),
# Extract links matching 'item.php' and parse them with the spider's method parse_item
Rule(LinkExtractor(allow=('item\.php', )), callback='parse_item'),
)
def parse_item(self, response):
self.logger.info('Hi, this is an item page! %s', response.url)
item = scrapy.Item()
item['id'] = response.xpath('//td[@id="item_id"]/text()').re(r'ID: (\d+)')
item['name'] = response.xpath('//td[@id="item_name"]/text()').extract()
item['description'] = response.xpath('//td[@id="item_description"]/text()').extract()
return item
分析拉勾网模板代码
1. 将http加上s
2. 重命名parse_item为我们自定义的parse_job
3. 点击class LagouSpider(CrawlSpider):
的CrawlSpider,进入crawl源码
4. class CrawlSpider(Spider):
可以看出它继承于spider
5. 入口:def start_requests(self):
6. alt+左右方向键,不同代码跳转
7. 5->之后默认parse CrawlSpider里面有parse函数。但是这次我们不能向以前一样覆盖
Crawl.py核心函数parse。
def parse(self, response):
return self._parse_response(response, self.parse_start_url, cb_kwargs={}, follow=True)
_parse_response
- 判断是否有callback即有没有self.parse_start_url
- 我们可以重载parse_start_url加入自己的处理
- 把参数传递给函数,并调用process_results函数
_parse_response函数
def _parse_response(self,