安装:
pip install python-consul
pip install pybase62
import re
import base62
import consul
class ConsulInfo:
def __init__(self, router_adds):
self.host = "consul.xxxx.local"
self.port = "xxxx"
self.token = "xxxx"
self.router_adds = "config/" + router_adds
self.client = consul.Consul(self.host, self.port, token=self.token, verify=False)
def replace_url(self, url_):
consul_data = self.get_consul_data()
if not consul_data: return False
consul_txt = ''
for index, i in enumerate(consul_data.splitlines()):
pattern = re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+')
txt_ = re.findall(pattern, i)
if not txt_ or txt_[0].startswith(url_):
consul_txt = consul_txt + i + "\n"
continue
new_url = url_ + base62.encodebytes(txt_[0].encode())
consul_txt1 = re.sub(pattern, new_url, i)
consul_txt = consul_txt + consul_txt1 + "\n"
self.update_consul(bytes(consul_txt, encoding='utf8'))
return True
def get_consul_data(self):
_, data = self.client.kv.get(self.router_adds)
return str(data["Value"], encoding="utf8")
def update_consul(self, txt):
response = self.client.kv.put(self.router_adds, value=str(txt, encoding="utf8").encode("utf-8"))
assert response is True
ConsulInfo("xxxx:debug/data").replace_url("http://22.33.44.55:345/")
运行前:
运行后:
|