IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> python读取consul配置及用特定规则替换其中url -> 正文阅读

[大数据]python读取consul配置及用特定规则替换其中url

安装:

pip install python-consul
pip install pybase62
import re

import base62
import consul


class ConsulInfo:
    def __init__(self, router_adds):
        self.host = "consul.xxxx.local" # consul的url
        self.port = "xxxx" # 端口
        self.token = "xxxx" # 登录consul的token
        self.router_adds = "config/" + router_adds # 读取consul目的文件配置的url
        self.client = consul.Consul(self.host, self.port, token=self.token, verify=False) # 连接consul

    def replace_url(self, url_):
        #替换consul配置中url
        consul_data = self.get_consul_data()
        if not consul_data: return False
        consul_txt = ''
        for index, i in enumerate(consul_data.splitlines()):
        # 读取consul配置文件中每一行
            pattern = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+')
            txt_ = re.findall(pattern, i) # 查找每行中是否有http://或者https://开头的url
            if not txt_ or txt_[0].startswith(url_):
            # 如果没有或者是已替换过的url,则直接添加
                consul_txt = consul_txt + i + "\n"
                continue
                
            # 用自己定义的url加上原consul中url经base62转义的值组成新的url
            new_url = url_ + base62.encodebytes(txt_[0].encode()) 
            consul_txt1 = re.sub(pattern, new_url, i)
            consul_txt = consul_txt + consul_txt1 + "\n"
        self.update_consul(bytes(consul_txt, encoding='utf8')) # 更新consul配置
        return True

    def get_consul_data(self):
        # 获取consul配置数据
        _, data = self.client.kv.get(self.router_adds)
        return str(data["Value"], encoding="utf8")

    def update_consul(self, txt):
        # 更新consul配置数据
        response = self.client.kv.put(self.router_adds, value=str(txt, encoding="utf8").encode("utf-8"))
        assert response is True

ConsulInfo("xxxx:debug/data").replace_url("http://22.33.44.55:345/")

运行前:
在这里插入图片描述

运行后:
在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-02-22 20:40:12  更:2022-02-22 20:40:42 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 0:16:16-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码