IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 系统运维 -> 用 Python 画一张北京实时疫情地图:从爬虫到绘图到自动上传服务器 -> 正文阅读

[系统运维]用 Python 画一张北京实时疫情地图:从爬虫到绘图到自动上传服务器

效果

在线地图

闲谈

我校传统是五一放七天,五一前两天开始放,当时疫情初见端倪,但是网上并没有找到准确显示病例位置的地图

后来知道腾讯地图是有的,但是不支持选择时间区间,所以把很早之前的也都显示了出来,整个地图很乱,几乎没有实际价值

所以出去玩前花了2个小时做了个简易版的地图,然后就快快乐乐和人出去了。最近又加上了爬虫,自动上传服务器,高德API并发,觉得可以把这个大杂烩发出来了

需求

  1. 数据爬取:北京卫健委,BeautifulSoup
  2. 坐标获取:高德地图API
  3. 地图绘制:Folium
  4. 自动上传服务器:paramiko

数据爬取

注意

需要说在前面的是,适度爬虫,不要给服务器造成压力,比如做好本地存储避免重复访问,限制爬虫频率等,我这里限制了至少间隔 1s 才能访问一次

元素定位

爬虫都是就题发挥,这里提取所有 a标签,然后筛选出 title 包含“日”、“月”、“新增”、“例”的。值得注意的是,北京卫健委有很简单的反爬虫机制,你需要把 User-Agent 伪装一下

def update(url='http://wjw.beijing.gov.cn/xwzx_20031/wnxw/'):
    '''北京卫健委爬虫'''
    headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64 AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.54 Safari/537.36"}
    bs = BeautifulSoup(
        kind_get(url, headers=headers).text, 'lxml')
    for news in bs.find_all('a'):
        try:
            title = news['title']
        except KeyError:
            pass
        else:
            if title.find('月') != -1 and title.find('日') != -1 and title.find('例') != -1:
                print(title)

然后把月和日用正则表达式提取出来,再访问 href ,进一步把所有含有"现住"的 p标签 提取出来

 date = re.search('(?<=北京).*?(?=新增)', title)
 if date:
     m, d, _ = re.split("[月日]", date.group())
     path = './data/{}{}.txt'.format(m.zfill(2), d.zfill(2))
     if not os.path.exists(path):
         bs_news = BeautifulSoup(
             kind_get(parse.urljoin(url, news['href']), headers=headers).text, 'lxml')
         try:
             with open(path, 'w', encoding='utf-8') as file:
                 article = bs_news.find('div', class_='view')
                 for item in article.find_all('p'):
                     if item.text.find('现住') != -1:
                         file.write(item.text+'\n')
                 print(date.group(), '数据文件添加成功')
         except AttributeError:
             print(article)
             print(date.group(), '数据文件添加异常,可能当日病例没有明确地址')
             os.remove(path)
     else:
         print(date.group(), '数据文件已存在')

提取出来的数据大概长这样

确诊病例1、3、4、19:现住朝阳区劲松街道农光里。5月6日诊断为确诊病例,临床分型均为轻型。

确诊病例2:现住朝阳区高碑店西店社区义安门兰花巷。5月6日诊断为确诊病例,临床分型为轻型。

确诊病例5、6、7:现住海淀区甘家口街道甘东社区。5月6日诊断为确诊病例,临床分型均为轻型。

确诊病例8:现住海淀区甘家口街道增光佳苑。5月6日诊断为确诊病例,临床分型为轻型。

确诊病例9:现住海淀区西北旺镇大牛坊社区。5月6日诊断为确诊病例,临床分型为轻型。

数据分析

接下来就是拆分出每个病例的住址了,这里最普遍的格式是 确诊病例x,现住xxxxx。,但是还有很多奇奇怪怪的格式

  • 确诊病例123,现住xxxxx。

    用正则表达式提取“现住”和“。”的中间

  • 确诊病例1、45、47至48,现住xxxx。

    这里病例数不是 1,先用正则匹配出“病例”和“,”中间,然后先按“、"分隔一次,再用至分隔一次,然后一个个加起来

  • 确诊病例3,现住址和1相同。

    与其倒回去找1,不如在病例 1 的时候用正则表达式提取整个文章出现几次 1。这里提取 1 有一点点难度,因为要排除 12 这种,我编写的正则表达式是 f"(?<=[^0-9]){int(ID)}(?=[^0-9])",ID是对应病例编号,这个表达式的意义是找 ID 且左右两边不是数字。

def getCaseAddress(date):
    '''对卫健委的报告文本进行处理,date是文件名列表,提取格式为 date caseId address'''
    caseAddress = []
    for d in date:
        path = './data/'+d+'.txt'
        if not os.path.exists(path):
            continue
        with open(path, encoding='utf-8') as f:
            text = f.read()
            tmp = text.replace('\n\n', '\n').split('\n')
            sum = 0
            for i, t in enumerate(tmp):
                if t:
                    if t.find('相同') != -1:
                        continue
                    caseID = re.search('(?<=确诊病例).*?(?=:)', t.strip())
                    num = 0
                    if caseID:
                        caseIDs = caseID.group().split('、')
                        for ID in caseIDs:
                            if ID.find('至') != -1:
                                x, y = ID.split('至')
                                num += int(y)-int(x)+1
                            else:
                                num += len(re.findall(
                                    f"(?<=[^0-9]){int(ID)}(?=[^0-9])", t.strip()))
                    else:
                        num = 1
                    if t.find('现住址均位于') != -1:
                        mat = re.search('(?<=现住址均位于).*?(?=[,。])', t.strip())
                    elif t.find('现住址位于') != -1:
                        mat = re.search('(?<=现住址位于).*?(?=[,。])', t.strip())
                    else:
                        mat = re.search('(?<=现住).*?(?=[,。])', t.strip())
                    if mat:
                        for i in range(num):
                            caseAddress.append([d, i+1, mat.group()])
                    else:
                        print(t)
                        print('fali')
                    sum += num
            # print(d,sum)
    return np.array(caseAddress, dtype=object)

获取地理位置

我们提取到 ["朝阳区劲松街道农光里","朝阳区高碑店西店社区义安门兰花巷",...] 这样的地址后就要把它转换成经纬度才能显示在地图上,这里我选择调用高德地图API,免费且完全够用。

高德地图有给模板,照抄就行,这里开启多地址,一次调用可以同时查询 10 个地址的经纬度

多地址的格式是 "朝阳区劲松街道农光里|朝阳区高碑店西店社区义安门兰花巷" 这样

返回 json 格式,python 有库可以处理成 dict 的样子

def getCode(address):
    '''由地址获取经纬度,调用高德地图API,配额300000/day'''
    codeJson = json.loads(get(
        f'https://restapi.amap.com/v3/geocode/geo?address={address}&batch=true&city=beijing&output=JSON&key={amapkey}').text)['geocodes']

    return [[pos['formatted_address'], [float(x) for x in pos['location'].split(',')[::-1]]] for pos in codeJson]

然后我们只要 10 个 10 个把格式弄好询问就行,实测 700 个地址问 70 次,如果是单线程要 5s 才能问完,所以下面给出多线程的版本,耗时 0.6s,如果不在意这一点时间,其实单线程也不是很慢

def thread_task(Q_input: Queue, Q_output: Queue):
    while not Q_input.empty():
        url = Q_input.get()
        Q_output.put(getCode(url))


def getCaseCode(caseAddress):
    '''对 caseAddress 进一步处理,返回格式为 date caseId address formatAddress code'''
    batchSize = 10
    bg = 0
    ed = len(caseAddress) if bg+batchSize >= len(caseAddress) else bg+batchSize
    webAPI = []
    Q = Queue()
    Q_output = Queue()
    while ed <= len(caseAddress):
        addressBatch = caseAddress[bg:ed, 2]
        addressBatchAtr = '|'.join(addressBatch)
        Q.put(addressBatchAtr)
        if ed == len(caseAddress):
            break
        bg = bg+batchSize
        ed = len(caseAddress) if bg + \
            batchSize >= len(caseAddress) else bg+batchSize\

    pool = []
    for i in range(64):
        t = Thread(target=thread_task, args=(
            Q, Q_output,))
        t.start()
        pool.append(t)

    for t in pool:
        t.join()

    while not Q_output.empty():
        webAPI.extend(Q_output.get())

    webAPI = np.array(webAPI, dtype=object)
    return np.c_[caseAddress, webAPI]

绘制地图

最后我们得到了形如 date caseId address formatAddress code 的处理后的数据,分别是日期,病例编号,通报地址,高德API返回的标准地址,高德API返回的经纬度

画地图我用的是 folium,他还可以导出成 html,然后我给这个 html 文件加了个标题

Circle 是画圆,Marker 是画标记,HeatMap 可以根据散点画热力图

mytiles 是地图底图,这里是之前在 Mapbox 自定义过一个,我感觉比原版更好看一点点就拿来用了,也可以用默认的

my_tiles = 'https://api.mapbox.com/styles/v1/xm-jarden/ckzv42k9u000o14p84ie307mj/tiles/256/{z}/{x}/{y}@2x?access_token=pk.eyJ1IjoieG0tamFyZGVuIiwiYSI6ImNreWl5ZGpzcTF4cXEyeGxrZ3h6MTlxdmQifQ.V39RuqOFz3oq2rFMU3XCNA'
# 以人大为中心
map = folium.Map(location=[39.968448, 116.311529], tiles=my_tiles,
                 attr='Mapbox', zoom_start=11)
folium.Circle([39.968448, 116.311529], 500,).add_to(map)
for case in caseCode:
    folium.Marker(case[-1], popup=case[0]+case[2]).add_to(map)
HeatMap(caseCode[:, -1], radius=40).add_to(map)
map.save('map.html')
with open('map.html', 'a+', encoding='utf-8') as file:
    file.write('\n<head><title>{}</title></head>'.format(
        datetime.datetime.strftime(datetime.datetime.now(), '%m月%d日%H:%M:%S 自动更新')))

上传服务器

因为我刚好有个个人网站,顺手把地图挂上去了,但每次更新地图重新传文件甚是麻烦,发现可以用 paramiko 库自动化,而且非常简单

ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname='45.76.120.215', port=22,
            username='root', password=sshpassword)
sftp = paramiko.SFTPClient.from_transport(ssh.get_transport())
path = "/www/wwwroot/www.panjd.tech/map.html"
sftp.put('./map.html', path)

完整主程序

GitHub仓库更完整 https://github.com/panjd123/BeijingPandemicMap

想要本地体验的朋友可以把最后几行服务器部分的代码删了,然后去高德地图申请一个免费的APIkey,替换掉配置文件里的就行

from threading import Thread
from time import sleep
from bs4 import BeautifulSoup
import paramiko
import folium
from folium.plugins import HeatMap
from requests import get
import pandas as pd
import datetime
import json
import re
import numpy as np
import os
from timeit import default_timer as timer
from urllib import parse
from queue import Queue
import configparser


class KindGet:
    interval = 1
    tic = 0

    def __init__(self, _interval=1):
        self.interval = _interval
        self.tic = timer()

    def __call__(self, url, headers=""):
        sleep(max(0, timer()-self.tic))
        ret = get(url=url, headers=headers)
        self.tic = timer()
        return ret


def datelist(beginDate, endDate):
    # beginDate, endDate是形如‘20160601’的字符串或datetime格式
    return [datetime.datetime.strftime(x, '%m%d')
            for x in list(pd.date_range(start=beginDate, end=endDate))]


kind_get = KindGet()
config = configparser.ConfigParser()
config.read("./config.ini")
# 高德地图APIkey
amapkey = config.get("panjd123", "amapkey")
# 服务器密码
sshpassword = config.get("panjd123", "sshpassword")

my_tiles = 'https://api.mapbox.com/styles/v1/xm-jarden/ckzv42k9u000o14p84ie307mj/tiles/256/{z}/{x}/{y}@2x?access_token=pk.eyJ1IjoieG0tamFyZGVuIiwiYSI6ImNreWl5ZGpzcTF4cXEyeGxrZ3h6MTlxdmQifQ.V39RuqOFz3oq2rFMU3XCNA'
# 以人大为中心
map = folium.Map(location=[39.968448, 116.311529], tiles=my_tiles,
                 attr='Mapbox', zoom_start=11)
folium.Circle([39.968448, 116.311529], 500,).add_to(map)


def update_all(index=2):
    for i in range(index):
        if i:
            update(f'http://wjw.beijing.gov.cn/xwzx_20031/wnxw/index_{i}.html')
        else:
            update()


def update(url='http://wjw.beijing.gov.cn/xwzx_20031/wnxw/'):
    '''北京卫健委爬虫'''
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.54 Safari/537.36",
    }
    bs = BeautifulSoup(
        kind_get(url, headers=headers).text, 'lxml')
    for news in bs.find_all('a'):
        try:
            title = news['title']
        except KeyError:
            pass
        else:
            if title.find('月') != -1 and title.find('日') != -1 and title.find('例') != -1:
                print(title)
                date = re.search('(?<=北京).*?(?=新增)', title)
                if date:
                    m, d, _ = re.split("[月日]", date.group())
                    path = './data/{}{}.txt'.format(m.zfill(2), d.zfill(2))
                    if not os.path.exists(path):
                        bs_news = BeautifulSoup(
                            kind_get(parse.urljoin(url, news['href']), headers=headers).text, 'lxml')
                        try:
                            with open(path, 'w', encoding='utf-8') as file:
                                article = bs_news.find('div', class_='view')
                                for item in article.find_all('p'):
                                    if item.text.find('现住') != -1:
                                        file.write(item.text+'\n')
                                print(date.group(), '数据文件添加成功')
                        except AttributeError:
                            print(article)
                            print(date.group(), '数据文件添加异常,可能当日病例没有明确地址')
                            os.remove(path)
                    else:
                        print(date.group(), '数据文件已存在')


def mark(address):
    '''输入一个地址,标记一个位置'''
    x, y = json.loads(get(
        f'https://restapi.amap.com/v3/geocode/geo?address={address}&batch=true&city=beijing&output=JSON&key={amapkey}').text)['geocodes'][0]['location'].split(',')
    folium.Circle([y, x], 500).add_to(map)


def getCode(address):
    '''由地址获取经纬度,调用高德地图API,配额300000/day'''
    codeJson = json.loads(get(
        f'https://restapi.amap.com/v3/geocode/geo?address={address}&batch=true&city=beijing&output=JSON&key={amapkey}').text)['geocodes']

    # print(address)
    # print([pos['location'] for pos in codeJson])

    return [[pos['formatted_address'], [float(x) for x in pos['location'].split(',')[::-1]]] for pos in codeJson]


def getCaseAddress(date):
    '''对卫健委的报告文本进行处理,date是文件名列表,提取格式为 date caseId address'''
    caseAddress = []
    for d in date:
        path = './data/'+d+'.txt'
        if not os.path.exists(path):
            continue
        with open(path, encoding='utf-8') as f:
            text = f.read()
            tmp = text.replace('\n\n', '\n').split('\n')
            sum = 0
            for i, t in enumerate(tmp):
                if t:
                    if t.find('相同') != -1:
                        continue
                    caseID = re.search('(?<=确诊病例).*?(?=:)', t.strip())
                    num = 0
                    if caseID:
                        caseIDs = caseID.group().split('、')
                        for ID in caseIDs:
                            if ID.find('至') != -1:
                                x, y = ID.split('至')
                                num += int(y)-int(x)+1
                            else:
                                num += len(re.findall(
                                    f"(?<=[^0-9]){int(ID)}(?=[^0-9])", t.strip()))
                    else:
                        num = 1
                    if t.find('现住址均位于') != -1:
                        mat = re.search('(?<=现住址均位于).*?(?=[,。])', t.strip())
                    elif t.find('现住址位于') != -1:
                        mat = re.search('(?<=现住址位于).*?(?=[,。])', t.strip())
                    else:
                        mat = re.search('(?<=现住).*?(?=[,。])', t.strip())
                    if mat:
                        for i in range(num):
                            caseAddress.append([d, i+1, mat.group()])
                    else:
                        print(t)
                        print('fali')
                    sum += num
            # print(d,sum)
    return np.array(caseAddress, dtype=object)


def thread_task(Q_input: Queue, Q_output: Queue):
    while not Q_input.empty():
        url = Q_input.get()
        Q_output.put(getCode(url))


def getCaseCode(caseAddress):
    '''对 caseAddress 进一步处理,返回格式为 date caseId address formatAddress code'''
    batchSize = 10
    bg = 0
    ed = len(caseAddress) if bg+batchSize >= len(caseAddress) else bg+batchSize
    webAPI = []
    Q = Queue()
    Q_output = Queue()
    while ed <= len(caseAddress):
        addressBatch = caseAddress[bg:ed, 2]
        addressBatchAtr = '|'.join(addressBatch)
        Q.put(addressBatchAtr)
        if ed == len(caseAddress):
            break
        bg = bg+batchSize
        ed = len(caseAddress) if bg + \
            batchSize >= len(caseAddress) else bg+batchSize\

    pool = []
    for i in range(64):
        t = Thread(target=thread_task, args=(
            Q, Q_output,))
        t.start()
        pool.append(t)

    for t in pool:
        t.join()

    while not Q_output.empty():
        webAPI.extend(Q_output.get())

    webAPI = np.array(webAPI, dtype=object)
    return np.c_[caseAddress, webAPI]


tic = timer()
print('正在从北京卫健委更新数据')
update_all()
toc = timer()
print(f'更新完成,用时 {toc-tic} s')


tic = timer()
print('正在处理和获取病例坐标信息')
date = datelist(datetime.datetime.now() -
                datetime.timedelta(days=14), datetime.datetime.now())
print('处理日期列表', date)
caseAddress = getCaseAddress(date)
caseCode = getCaseCode(caseAddress)
toc = timer()
print(f'处理结束,数据量 {len(caseCode)} 条,用时 {toc-tic} s')


tic = timer()
print('正在生成和保存地图')
for case in caseCode:
    folium.Marker(case[-1], popup=case[0]+case[2]).add_to(map)
HeatMap(caseCode[:, -1], radius=40).add_to(map)
map.save('map.html')
with open('map.html', 'a+', encoding='utf-8') as file:
    file.write('\n<head><title>{}</title></head>'.format(
        datetime.datetime.strftime(datetime.datetime.now(), '%m月%d日%H:%M:%S 自动更新')))
toc = timer()
print(f'保存成功,用时 {toc-tic} s')


tic = timer()
print('正在上传到panjd.tech')
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(hostname='45.76.120.215', port=22,
            username='root', password=sshpassword)
sftp = paramiko.SFTPClient.from_transport(ssh.get_transport())
path = "/www/wwwroot/www.panjd.tech/map.html"
sftp.put('./map.html', path)
toc = timer()
print(f'上传成功,用时 {toc-tic} s')
map

  系统运维 最新文章
配置小型公司网络WLAN基本业务(AC通过三层
如何在交付运维过程中建立风险底线意识,提
快速传输大文件,怎么通过网络传大文件给对
从游戏服务端角度分析移动同步(状态同步)
MySQL使用MyCat实现分库分表
如何用DWDM射频光纤技术实现200公里外的站点
国内顺畅下载k8s.gcr.io的镜像
自动化测试appium
ctfshow ssrf
Linux操作系统学习之实用指令(Centos7/8均
上一篇文章      下一篇文章      查看所有文章
加:2022-05-10 12:16:32  更:2022-05-10 12:19:13 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/2 0:46:28-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码