系列文章
Python 学习 01 —— Python基础 Python 库学习 —— urllib 学习 Python 库学习 —— BeautifulSoup4学习 Python 库学习 —— Re 正则表达式 Python 库学习 —— Excel存储(xlwt、xlrd) Python 学习 02 —— Python爬虫 Python 库学习 —— Flask 基础学习 Python 学习03 —— 爬虫网站项目
三、实战项目
1、项目说明
-
源码分享: CSDN:https://download.csdn.net/download/qq_39763246/20255591 百度:https://pan.baidu.com/s/1rbeB8qSSV-reki6umd9_3g 提取码: fxq4 Gitee:https://gitee.com/coder-zcy/douban_spider -
技术框架: 后端使用Flask框架进行路由解析和模板渲染,前端是在网上先搜了个模板然后用Bootstrap和Layui修改了下,数据存储用了Excel和SQLite。 -
项目内容: 主要是把前面Python学习的内容进行了整合练手,使用Python对豆瓣电影Top250进行爬取,将爬取到的数据存储到SQLite和Excel,然后通过Layui和ECharts将数据展现到网站上。同时,利用WordCloud进行词云图片生成,用户可以上传图片,由网站图片进行词云化处理。
下面是网站的实际效果。
2、项目代码
这里只贴Python部分,前端内容太多了。代码里的注释还是比较详细的,就不再过多说明了。用到的其他技术可以参考我之前写的文章。
-
项目结构 -
爬虫部分 spider.py
import re
from bs4 import BeautifulSoup
import urllib.request, urllib.error, urllib.parse
import xlwt
import sqlite3
import ssl
import os
findLink = re.compile(r'<a href="(.*?)">')
findImg = re.compile(r'<img.*src="(.*?)"', re.S)
findTitle = re.compile(r'<span class="title">(.*)</span>')
findOther = re.compile(r'<span class="other">(.*)</span>')
findRating = re.compile(r'<span class="rating_num" property="v:average">(.*)</span>')
findJudge = re.compile(r'<span>(\d*)人评价</span>')
findInq = re.compile(r'<span class="inq">(.*)</span>')
findBd = re.compile(r'<p class="">(.*?)</p>', re.S)
def spiderStart():
try:
ssl._create_default_https_context = ssl._create_unverified_context
baseUrl = "https://movie.douban.com/top250?start="
dataList = getData(baseUrl)
savePath = r"static\file\豆瓣电影Top250.xlsx"
saveData(dataList, savePath)
dbPath = r"static\file\movie250.db"
saveDataToDB(dataList, dbPath)
return "success"
except Exception:
return "false"
return "false"
def askURL(url):
head = {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36"
}
request = urllib.request.Request(url, headers=head)
html = ""
try:
response = urllib.request.urlopen(request)
html = response.read().decode("utf-8")
except urllib.error.URLError as e:
if hasattr(e, "code"):
print("状态码:", e.code)
if hasattr(e, "reason"):
print("原因:", e.reason)
return html
def getData(baseurl):
dataList = []
for i in range(0, 10):
url = baseurl + str(i * 25)
html = askURL(url)
soup = BeautifulSoup(html, "html.parser")
for item in soup.find_all("div", class_="item"):
data = []
item = str(item)
link = re.findall(findLink, item)[0]
data.append(link)
img = re.findall(findImg, item)[0]
data.append(img)
titles = re.findall(findTitle, item)
if (len(titles) == 2):
data.append(titles[0])
titles[1] = re.sub("(/)|( )", " ", titles[1])
data.append("".join(titles[1].split()))
else:
data.append(titles[0])
data.append("")
other = re.findall(findOther, item)[0]
other = other.replace("/", "", 1)
other = re.sub("( )|(NBSP)|(\\xa0)|( )", "", other)
data.append(other)
bd = re.findall(findBd, item)[0]
director = re.sub("( )|(\\xa0)|(\n)", "", bd[bd.index("导演: ") + 4:])
director = director[:director.index(" ")]
info = re.sub("( )|(\\xa0)|(\n)", "", bd[bd.index("<br/>") + 5:])
info = info.split("/")
year = info[0].strip()
country = info[1].strip()
leibie = info[2].strip()
data.append(director)
data.append(country)
data.append(leibie)
data.append(year)
rating = re.findall(findRating, item)[0]
data.append(rating)
judgeNum = re.findall(findJudge, item)[0]
data.append(judgeNum)
inq = re.findall(findInq, item)
if len(inq) != 0:
inq = inq[0].replace("。", "")
data.append(inq)
else:
data.append("")
dataList.append(data)
return dataList
def saveData(dataList, savePath):
print("正在保存...")
book = xlwt.Workbook(encoding="utf-8")
sheet = book.add_sheet("豆瓣电影Top250", cell_overwrite_ok=True)
col = ("详情链接", "图片链接", "中文名", "外国名", "别称", "导演", "国家", "类型", "年份", "评分", "评价人数", "一句话概括")
for i in range(0, len(col)):
sheet.write(0, i, col[i])
for i in range(0, 250):
data = dataList[i]
for j in range(0, len(col)):
sheet.write(i + 1, j, data[j])
try:
f = open(savePath)
print("Excel文件存在,进行删除")
os.remove(savePath)
except IOError:
print("Excel文件不存在,直接保存")
book.save(savePath)
print("保存完成!!")
return True
def iniDB(dbPath):
print("正在初始化...")
sql = "drop table if exists movie250"
connect = sqlite3.connect(dbPath)
cursor = connect.cursor()
cursor.execute(sql)
connect.commit()
sql = '''
create table if not exists movie250(
id integer primary key autoincrement,
info_link text,
pic_link text,
c_name varchar,
e_name varchar,
other_name varchar,
director varchar,
country varchar,
types varchar,
years varchar,
score numeric,
rated numeric,
introduction text
);
'''
cursor.execute(sql)
connect.commit()
cursor.close()
connect.close()
print("初始化完成!!")
def saveDataToDB(dataList, dbPath):
iniDB(dbPath)
connect = sqlite3.connect(dbPath)
cursor = connect.cursor()
print("正在入库...")
i = 1
for data in dataList:
for index in range(len(data)):
if index == 9 or index == 10:
continue
data[index] = '"' + data[index] + '"'
sql = '''
insert into movie250(
id, info_link, pic_link, c_name, e_name, other_name, director, country, types, years, score, rated, introduction)
values(%d, %s)''' % (i, ",".join(data))
i += 1
cursor.execute(sql)
connect.commit()
cursor.close()
connect.close()
print("入库完毕!!")
return True
-
词云部分 word.py
import jieba
from matplotlib import pyplot as plt
from wordcloud import WordCloud, ImageColorGenerator
from PIL import Image
import numpy as np
import sqlite3
def generateImage(image, font, dpi):
connect = sqlite3.connect("static/file/movie250.db")
cursor = connect.cursor()
sql = "select introduction from movie250"
data = cursor.execute(sql)
text = ""
for item in data:
text = text + item[0]
cursor.close()
connect.close()
cut = jieba.cut(text)
string = " ".join(cut)
imgArray = Image.open(image)
img_array = np.array(imgArray)
wc = WordCloud(
background_color='white',
mask=img_array,
font_path="static/fonts/" + font
)
wc.generate_from_text(string)
fig = plt.figure(1)
plt.imshow(wc)
plt.axis('off')
plt.savefig(image, dpi=dpi)
return "success"
def generateRec(height, width):
connect = sqlite3.connect("static/file/movie250.db")
cursor = connect.cursor()
sql = "select introduction from movie250"
data = cursor.execute(sql)
text = ""
for item in data:
text = text + item[0]
cursor.close()
connect.close()
cut = jieba.cut(text)
string = " ".join(cut)
wc = WordCloud(font_path='static/fonts/Kaiti.ttc', width=width, height=height, mode='RGBA', background_color=None).generate(
string)
plt.imshow(wc, interpolation='bilinear')
plt.axis('off')
plt.show()
wc.to_file('static/images/word-rectangle.png')
return "success"
-
网站部分 app.py from flask import Flask, render_template, request, send_from_directory, jsonify, redirect, url_for
from werkzeug.utils import secure_filename
import sqlite3, datetime, random, spider, os
import word as wordImage
app = Flask(__name__)
app.config['JSON_AS_ASCII'] = False
@app.route('/')
def index():
connect = sqlite3.connect("static/file/movie250.db")
cursor = connect.cursor()
sql = "select rated from movie250"
data = cursor.execute(sql)
total = 0
for i in data:
total += int(i[0])
return render_template('index.html', total=total)
@app.route('/index')
def home():
return index()
@app.route('/movie')
def movie():
return render_template('movie.html')
@app.route('/getMovie', methods=["GET"])
def getMovie():
page = int(request.args.get('page'))
limit = int(request.args.get('limit'))
dataList = []
connect = sqlite3.connect("static/file/movie250.db")
cursor = connect.cursor()
sql = "select * from movie250 limit " + str((page - 1) * limit) + "," + str(limit)
data = cursor.execute(sql)
for item in data:
jsonDic = {}
jsonDic['id'] = item[0]
jsonDic['info_lin'] = item[1]
jsonDic['pic_link'] = item[2]
jsonDic['c_name'] = item[3]
jsonDic['e_name'] = item[4]
jsonDic['other_name'] = item[5]
jsonDic['director'] = item[6]
jsonDic['country'] = item[7]
jsonDic['types'] = item[8]
jsonDic['years'] = item[9]
jsonDic['score'] = item[10]
jsonDic['rated'] = item[11]
jsonDic['introduction'] = item[12]
dataList.append(jsonDic)
cursor.close()
connect.close()
return jsonify(dataList)
@app.route('/statics')
def score():
connect = sqlite3.connect("static/file/movie250.db")
cursor = connect.cursor()
chart1 = [[], []]
sql1 = "select score, count(score) from movie250 group by score"
data1 = cursor.execute(sql1)
for item in data1:
chart1[0].append(item[0])
chart1[1].append(item[1])
sql2 = "select country from movie250"
data2 = cursor.execute(sql2)
chart2Dic = {}
for item in data2:
countryList = item[0].split(" ")
for country in countryList:
try:
if country.index("1964"):
continue
except ValueError:
try:
chart2Dic[country] = chart2Dic[country] + 1
except KeyError:
chart2Dic[country] = 1
chart2 = []
for key, value in chart2Dic.items():
tempDic = {}
tempDic["value"] = value
tempDic["name"] = key
chart2.append(tempDic)
chart3 = [[], []]
sql3 = "select years, count(years) from movie250 group by years"
data3 = cursor.execute(sql3)
for item in data3:
try:
if item[0].index("中国大陆"):
continue
except ValueError:
chart3[0].append(int(item[0]))
chart3[1].append(item[1])
sql4 = "select types from movie250"
data4 = cursor.execute(sql4)
chart4Dic = {}
for item in data4:
typeList = item[0].split(" ")
try:
if typeList.index("1978(中国大陆)"):
continue
except ValueError:
for types in typeList:
try:
chart4Dic[types] = chart4Dic[types] + 1
except KeyError:
chart4Dic[types] = 1
chart4 = []
for key, value in chart4Dic.items():
tempDic = {}
tempDic["value"] = value
tempDic["name"] = key
chart4.append(tempDic)
cursor.close()
connect.close()
return render_template('score.html', chart1=chart1, chart2=chart2, chart3=chart3, chart4=chart4)
@app.route('/word')
def word():
return render_template('word.html')
@app.route('/spider')
def spiderFunction():
state = spider.spiderStart()
return jsonify({"state": state})
@app.route('/temp')
def temp():
return render_template('temp.html')
@app.errorhandler(404)
def handle_404_error(err_msg):
return render_template('error.html')
@app.route('/download_excel')
def download_excel():
filename = "豆瓣电影Top250.xlsx"
dirpath = os.path.join(app.root_path, 'static', 'file')
return send_from_directory(dirpath, filename, as_attachment=True)
@app.route('/download_db')
def download_db():
filename = 'movie250.db'
dirpath = os.path.join(app.root_path, 'static', 'file')
return send_from_directory(dirpath, filename, as_attachment=True)
@app.route('/generateImg', methods=['GET', 'POST'])
def toGenerateImg():
return render_template('generate_wordcloud.html')
IMAGE_FOLDER = 'static/file/upload'
gen_rnd_filename = lambda: "%s%s" % (
datetime.datetime.now().strftime('%Y%m%d%H%M%S'), str(random.randrange(1000, 10000)))
allowed_file = lambda filename: '.' in filename and filename.rsplit('.', 1)[1] in set(
['png', 'jpg', 'jpeg', 'gif', 'bmp'])
app.config.update(
SECRET_KEY=os.urandom(24),
UPLOAD_FOLDER=os.path.join(app.root_path, IMAGE_FOLDER),
MAX_CONTENT_LENGTH=16 * 1024 * 1024
)
@app.route('/showimg/<filename>')
def showimg_view(filename):
return send_from_directory(app.config['UPLOAD_FOLDER'], filename)
@app.route('/upload/', methods=['POST', 'OPTIONS'])
def upload_view():
res = dict(code=-1, msg=None)
f = request.files.get('file')
if f and allowed_file(f.filename):
filename = secure_filename(gen_rnd_filename() + "." + f.filename.split('.')[-1])
if not os.path.exists(app.config['UPLOAD_FOLDER']):
os.makedirs(app.config['UPLOAD_FOLDER'])
f.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
wordImage.generateImage(IMAGE_FOLDER+'/'+filename, '娃娃体.otf', 800)
imgUrl = url_for('showimg_view', filename=filename, _external=True)
res.update(code=0, data=dict(src=imgUrl))
else:
res.update(msg="Unsuccessfully obtained file or format is not allowed")
return jsonify(res)
if __name__ == '__main__':
app.run()
|