在之前的工作里我实现了批量下载文件和批量写入elasticsearch的功能,具体可以看这个博客:
实习工作小结·批量下载文件和批量写入elasticsearchhttps://blog.csdn.net/qq_41938259/article/details/123862320?spm=1001.2014.3001.5501总的思路是,通过lxml解析器对原来的xml文件进行解析,提取出有用的信息,并导入到elasticsearch中去。下载模块是通过观察wget下载链接的规律,通过拆分字符串实现批量下载功能。然而经过领导的提醒和要求,我发现了自己的不足,我们需要做一个全自动化的脚本程序,只要一运行它就会自动比对没有下载的内容进行下载,而不是通过for循环傻傻地挨个下载。
那具体怎么实现呢,我得想法是先用requests模块(一个类似urlib模块的东西,如果你没用过他你可以大致把它想成urlib模块),去获取网站所有xml文件地名称,再和本地已经下载的文件的名称进行比对就行。获取本地文件列表的大致想法是用glob.glob模块进行。思路有了,于是就可以写代码了:
先获取网站的所有文件名,baseline页面是比较旧的文件,他不再更新,而daily是会时常更新的。代码如下:
# List of pending documents
# 先同步baseline的文件
root_url = "http://ftp.ncbi.nlm.nih.gov/pubmed/baseline/"
# 从baseline网页获取所有文件的列表以供比对
html = requests.get(root_url).content.decode()
addresses = etree.HTML(html).xpath("//html/body/pre/a/text()")
# 再同步daily的文件
root_url_2 = "https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/"
# 从daily网页获取所有文件的列表以供比对
html = requests.get(root_url_2).content.decode()
addresses_2 = etree.HTML(html).xpath("//html/body/pre/a/text()")
# 将网页文件列表元素转为string类型,以方便后续操作,并提取.xml.gz结尾的文件
web_address = []
web_address_2 = []
for add in addresses:
if str(add).endswith(".xml.gz"):
web_address.append(str(add))
for add in addresses_2:
if str(add).endswith(".xml.gz"):
web_address_2.append(str(add))
同样地,上述使用lxml模块进行网页的xpath解析,得到所有符合要求的文件名。接着用glob获取本地文件名:
# 检测本地已经下载的文件名,并生成列表
local_address = []
tmp_daily = glob.glob('daily/*.xml')
for file in tmp_daily:
# 去掉daily/前缀,以便后续比较
local_address.append(file.replace("daily\\", ""))
tmp_baseline = glob.glob('baseline/*.xml')
for file in tmp_baseline:
# 去掉baseline/前缀,以便后续比较
local_address.append(file.replace("baseline\\", ""))
接下来就是比较环节了:
for address in web_address:
# 去掉.gz的后缀,然后进行比对,如果出现在本地说明已经下载过了。
if address.replace(".gz", "") not in local_address:
print(address.replace(".gz", ""))
download_data(address, root_url, "baseline")
for address in web_address_2:
if address.replace(".gz", "") not in local_address:
print(address.replace(".gz", ""))
download_data(address, root_url_2, "daily")
这个函数的完整代码如下:
def main_func():
# List of pending documents
# 先同步baseline的文件
root_url = "http://ftp.ncbi.nlm.nih.gov/pubmed/baseline/"
# 从baseline网页获取所有文件的列表以供比对
html = requests.get(root_url).content.decode()
addresses = etree.HTML(html).xpath("//html/body/pre/a/text()")
# 再同步daily的文件
root_url_2 = "https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/"
# 从daily网页获取所有文件的列表以供比对
html = requests.get(root_url_2).content.decode()
addresses_2 = etree.HTML(html).xpath("//html/body/pre/a/text()")
# print(addresses_2)
# 将网页文件列表元素转为string类型,以方便后续操作
web_address = []
web_address_2 = []
for add in addresses:
if str(add).endswith(".xml.gz"):
web_address.append(str(add))
for add in addresses_2:
if str(add).endswith(".xml.gz"):
web_address_2.append(str(add))
# 检测本地已经下载的文件名,并生成列表
local_address = []
tmp_daily = glob.glob('daily/*.xml')
for file in tmp_daily:
# 去掉daily/前缀,以便后续比较
local_address.append(file.replace("daily\\", ""))
tmp_baseline = glob.glob('baseline/*.xml')
for file in tmp_baseline:
# 去掉baseline/前缀,以便后续比较
local_address.append(file.replace("baseline\\", ""))
print(local_address)
for address in web_address:
# 去掉.gz的后缀,然后进行比对,如果出现在本地说明已经下载过了。
if address.replace(".gz", "") not in local_address:
print(address.replace(".gz", ""))
download_data(address, root_url, "baseline")
for address in web_address_2:
if address.replace(".gz", "") not in local_address:
print(address.replace(".gz", ""))
download_data(address, root_url_2, "daily")
细心的童鞋会看到一个download_data函数,这是用来下载文件用的,具体代码给出。里面还包含了一些细节,例如删掉垃圾文件、解压等步骤,总之这个函数的逻辑不会是很难。
def download_data(address, root_url, dir_name):
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# process the file
try:
# 开始下载
wget.download(f"{root_url}{address}", out=f"{dir_name}/{address}")
# print(f"{root_url}{address}")
# 解压
os.system("gunzip -d " + f"{dir_name}/{address}")
logger.info(address + f" finished!")
except Exception as e:
logger.info(address + f" download Failed!!! Retry.")
logger.info(e)
# .tmp是下载到一半的文件,然偶它断开连接了,就会产生一个.tmp文件,我们把它删掉
if len(glob.glob('baseline/*.tmp')) != 0:
for file in glob.glob('baseline/*.tmp'):
os.remove(file)
if len(glob.glob('daily/*.tmp')) != 0:
for file in glob.glob('daily/*.tmp'):
os.remove(file)
# 删完上次的垃圾文件,继续下载
download_data(address, root_url, dir_name)
接着是该进写入到elasticsearch的功能。之前导入elasticsearch都是通过for循环傻傻的一个个挨着导入。每次带入都要改for循环的数字,很是麻烦。这次也做一个自动比对功能。具体思路是每写完一个文件就把这个文件的名子输出到日志文件,于是每次比对日志文件和已经下载好的文件的差集就是需要下载的文件了!
if __name__ == '__main__':
# 读取已经加载的列表
already_loaded = []
files = open("log_es_loaded.txt").readlines()
for file in files:
# print(file.replace("\n", ""))
already_loaded.append(file.replace("\n", ""))
# 检测本地已经下载的文件名,并生成列表
local_address = []
tmp_daily = glob.glob('daily/*.xml')
for file in tmp_daily:
local_address.append(file)
tmp_baseline = glob.glob('baseline/*.xml')
for file in tmp_baseline:
local_address.append(file)
# process the file
logging.basicConfig()
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.FileHandler("log_es_loaded.txt")
logger.addHandler(handler)
for file in local_address:
# 如果已经加载了
if file in already_loaded:
# 就直接跳过
pass
# 反之
else:
# 加载文件
get_data(file)
logger.info(file)
逻辑和注释也很清楚。get_data是将文件信息导入到elasticsearch中的功能,之前已经介绍过,改动不大这里不再赘述。
以上就是这么多!
END
|