IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 实习工作小结——下载和导入Elasticsearch模块实现自动比对的功能 -> 正文阅读

[大数据]实习工作小结——下载和导入Elasticsearch模块实现自动比对的功能

在之前的工作里我实现了批量下载文件和批量写入elasticsearch的功能,具体可以看这个博客:

实习工作小结·批量下载文件和批量写入elasticsearchicon-default.png?t=M3C8https://blog.csdn.net/qq_41938259/article/details/123862320?spm=1001.2014.3001.5501总的思路是,通过lxml解析器对原来的xml文件进行解析,提取出有用的信息,并导入到elasticsearch中去。下载模块是通过观察wget下载链接的规律,通过拆分字符串实现批量下载功能。然而经过领导的提醒和要求,我发现了自己的不足,我们需要做一个全自动化的脚本程序,只要一运行它就会自动比对没有下载的内容进行下载,而不是通过for循环傻傻地挨个下载。

那具体怎么实现呢,我得想法是先用requests模块(一个类似urlib模块的东西,如果你没用过他你可以大致把它想成urlib模块),去获取网站所有xml文件地名称,再和本地已经下载的文件的名称进行比对就行。获取本地文件列表的大致想法是用glob.glob模块进行。思路有了,于是就可以写代码了:

先获取网站的所有文件名,baseline页面是比较旧的文件,他不再更新,而daily是会时常更新的。代码如下:

    # List of pending documents
    # 先同步baseline的文件
    root_url = "http://ftp.ncbi.nlm.nih.gov/pubmed/baseline/"
    # 从baseline网页获取所有文件的列表以供比对
    html = requests.get(root_url).content.decode()
    addresses = etree.HTML(html).xpath("//html/body/pre/a/text()")

    # 再同步daily的文件
    root_url_2 = "https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/"
    # 从daily网页获取所有文件的列表以供比对
    html = requests.get(root_url_2).content.decode()
    addresses_2 = etree.HTML(html).xpath("//html/body/pre/a/text()")

    # 将网页文件列表元素转为string类型,以方便后续操作,并提取.xml.gz结尾的文件
    web_address = []
    web_address_2 = []
    for add in addresses:
        if str(add).endswith(".xml.gz"):
            web_address.append(str(add))
    for add in addresses_2:
        if str(add).endswith(".xml.gz"):
            web_address_2.append(str(add))

同样地,上述使用lxml模块进行网页的xpath解析,得到所有符合要求的文件名。接着用glob获取本地文件名:

    # 检测本地已经下载的文件名,并生成列表
    local_address = []
    tmp_daily = glob.glob('daily/*.xml')
    for file in tmp_daily:
        # 去掉daily/前缀,以便后续比较
        local_address.append(file.replace("daily\\", ""))
    tmp_baseline = glob.glob('baseline/*.xml')
    for file in tmp_baseline:
        # 去掉baseline/前缀,以便后续比较
        local_address.append(file.replace("baseline\\", ""))

接下来就是比较环节了:

    for address in web_address:
        # 去掉.gz的后缀,然后进行比对,如果出现在本地说明已经下载过了。
        if address.replace(".gz", "") not in local_address:
            print(address.replace(".gz", ""))
            download_data(address, root_url, "baseline")
    for address in web_address_2:
        if address.replace(".gz", "") not in local_address:
            print(address.replace(".gz", ""))
            download_data(address, root_url_2, "daily")

这个函数的完整代码如下:

def main_func():
    # List of pending documents
    # 先同步baseline的文件
    root_url = "http://ftp.ncbi.nlm.nih.gov/pubmed/baseline/"
    # 从baseline网页获取所有文件的列表以供比对
    html = requests.get(root_url).content.decode()
    addresses = etree.HTML(html).xpath("//html/body/pre/a/text()")

    # 再同步daily的文件
    root_url_2 = "https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/"
    # 从daily网页获取所有文件的列表以供比对
    html = requests.get(root_url_2).content.decode()
    addresses_2 = etree.HTML(html).xpath("//html/body/pre/a/text()")
    # print(addresses_2)

    # 将网页文件列表元素转为string类型,以方便后续操作
    web_address = []
    web_address_2 = []
    for add in addresses:
        if str(add).endswith(".xml.gz"):
            web_address.append(str(add))
    for add in addresses_2:
        if str(add).endswith(".xml.gz"):
            web_address_2.append(str(add))

    # 检测本地已经下载的文件名,并生成列表
    local_address = []
    tmp_daily = glob.glob('daily/*.xml')
    for file in tmp_daily:
        # 去掉daily/前缀,以便后续比较
        local_address.append(file.replace("daily\\", ""))
    tmp_baseline = glob.glob('baseline/*.xml')
    for file in tmp_baseline:
        # 去掉baseline/前缀,以便后续比较
        local_address.append(file.replace("baseline\\", ""))
    print(local_address)

    for address in web_address:
        # 去掉.gz的后缀,然后进行比对,如果出现在本地说明已经下载过了。
        if address.replace(".gz", "") not in local_address:
            print(address.replace(".gz", ""))
            download_data(address, root_url, "baseline")
    for address in web_address_2:
        if address.replace(".gz", "") not in local_address:
            print(address.replace(".gz", ""))
            download_data(address, root_url_2, "daily")

细心的童鞋会看到一个download_data函数,这是用来下载文件用的,具体代码给出。里面还包含了一些细节,例如删掉垃圾文件、解压等步骤,总之这个函数的逻辑不会是很难。

def download_data(address, root_url, dir_name):
    logging.basicConfig()
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.INFO)
    # process the file

    try:
        # 开始下载
        wget.download(f"{root_url}{address}", out=f"{dir_name}/{address}")
        # print(f"{root_url}{address}")
        # 解压
        os.system("gunzip -d " + f"{dir_name}/{address}")
        logger.info(address + f" finished!")
    except Exception as e:
        logger.info(address + f" download Failed!!! Retry.")
        logger.info(e)
        # .tmp是下载到一半的文件,然偶它断开连接了,就会产生一个.tmp文件,我们把它删掉
        if len(glob.glob('baseline/*.tmp')) != 0:
            for file in glob.glob('baseline/*.tmp'):
                os.remove(file)
        if len(glob.glob('daily/*.tmp')) != 0:
            for file in glob.glob('daily/*.tmp'):
                os.remove(file)
        # 删完上次的垃圾文件,继续下载
        download_data(address, root_url, dir_name)

接着是该进写入到elasticsearch的功能。之前导入elasticsearch都是通过for循环傻傻的一个个挨着导入。每次带入都要改for循环的数字,很是麻烦。这次也做一个自动比对功能。具体思路是每写完一个文件就把这个文件的名子输出到日志文件,于是每次比对日志文件和已经下载好的文件的差集就是需要下载的文件了!

if __name__ == '__main__':

    # 读取已经加载的列表
    already_loaded = []
    files = open("log_es_loaded.txt").readlines()
    for file in files:
        # print(file.replace("\n", ""))
        already_loaded.append(file.replace("\n", ""))

    # 检测本地已经下载的文件名,并生成列表
    local_address = []
    tmp_daily = glob.glob('daily/*.xml')
    for file in tmp_daily:
        local_address.append(file)
    tmp_baseline = glob.glob('baseline/*.xml')
    for file in tmp_baseline:
        local_address.append(file)

    # process the file
    logging.basicConfig()
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.INFO)
    handler = logging.FileHandler("log_es_loaded.txt")
    logger.addHandler(handler)

    for file in local_address:
        # 如果已经加载了
        if file in already_loaded:
            # 就直接跳过
            pass
        # 反之
        else:
            # 加载文件
            get_data(file)
            logger.info(file)

逻辑和注释也很清楚。get_data是将文件信息导入到elasticsearch中的功能,之前已经介绍过,改动不大这里不再赘述。

以上就是这么多!

END

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-22 18:43:14  更:2022-04-22 18:46:35 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 12:40:13-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码