IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Python知识库 -> flask 外部连接代码片段 -> 正文阅读

[Python知识库]flask 外部连接代码片段

# 根据环境变量信息去选择配置信息
env = os.getenv('ENV', 'dev')
assert env in ['dev', 'st', 'uat', 'prd']
Config = locals().get(env.capitalize())

# MYSQL ?sqlalchemy
from retrying import retry
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

# 配置
? ? Mysql = {
? ? ? ? 'url': 'mysql+pymysql://cmbuser:Cmb%402018@99.13.211.225:3306/tianji_ai',
? ? ? ? 'pool_recycle': 180,
? ? ? ? 'pool_size': 10,
? ? ? ? 'pool_timeout': 200,
? ? ? ? 'echo_pool': True,
? ? ? ? 'pool_pre_ping': True,
? ? ? ? 'pool_use_lifo': True
? ? }

class Session:
? ? def __init__(self):
? ? ? ? """延迟实例化"""
? ? ? ? self.scoped_session = None

? ? def init_config(self, *args, **config):
? ? ? ? engine = create_engine(*args, **config)
? ? ? ? session_factory = sessionmaker(bind=engine)
? ? ? ? self.scoped_session = scoped_session(session_factory)

? ? def execute_sql(self, sql):
? ? ? ? ss = self.scoped_session()
? ? ? ? try:
? ? ? ? ? ? return ss.execute(sql)
? ? ? ? finally:
? ? ? ? ? ? ss.close()

def with_db_session(func):
? ? def decorate(*args, **kwargs):
? ? ? ? try:
? ? ? ? ? ? ss = session.scoped_session()
? ? ? ? ? ? return func(*args, **kwargs, ss=ss)
? ? ? ? finally:
? ? ? ? ? ? ss.close()
? ? return decorate

# 三次尝试 @retry
@retry(stop_max_attempt_number=3)
def ensure_execute(ss, sql):
? ? try:
? ? ? ? resp = ss.execute(sql)
? ? ? ? ss.commit()
? ? ? ? return resp
? ? except Exception as e:
? ? ? ? ss.rollback()
? ? ? ? raise Exception('mysql 三次尝试 连接丢失')


session = Session() ?# db

ORM-使用:

class Task(Base):
????????__tablename__='tablename'
? ? ? ? id = Column(primary_key=True)

原生sql:

@with_db_session
def func(ss=None):
? ? ? ? #查询
? ? ? ? data = ss.execute(sql).first()
? ? ? ? #添加? ? ? ??
? ? ? ? encure_execute(ss,sql)

# redis 集群


# 配置
? ? Redis = {'startup_nodes': [{'host': '0.0.0.0', 'port': '6379'},
? ? ? ? {'host': '0.0.0.0', 'port': '6379'}],
? ? ? ? ? ? ?'password': 'password',
? ? ? ? ? ? ?'skip_full_coverage_check': True
? ? ? ? ? ? ?}

import rediscluster


class RedisClst:
? ? def __init__(self):
? ? ? ? self.client = None

? ? def init_config(self, **config):
? ? ? ? self.client = rediscluster.RedisCluster(**config)

def acquire_lock(self, lock_key, lock_val, expire_time=100):
? ? ? ? """获取锁
? ? ? ? """
? ? ? ? ret = self.client.set(lock_key, lock_val, ex=expire_time, nx=True)
? ? ? ? if ret:
? ? ? ? ? ? return ret
? ? ? ? return False

? ? def release_lock(self, lock_key):
? ? ? ? """释放锁
? ? ? ? """
? ? ? ? self.client.delete(lock_key) ?# 直接释放锁

# 可以使用多个实例来进行不同的配置
redis = RedisClst()
tj_redis = RedisClst()

# kafa 生产者配置
? ? Kafka = {
? ? ? ? 'hosts': "",
? ? ? ? 'mechanism': 'PLAIN',
? ? ? ? 'protocol': 'SASL_PLAINTEXT',
? ? ? ? 'user': 'user',
? ? ? ? 'pwd': 'userw0rd',
? ? ? ? 'topic': 'msg-sink'
? ? }

from confluent_kafka import Producer as RawKfkProducer


class KfkProducer:

? ? def __init__(self):
? ? ? ? self.topic = None
? ? ? ? self.producer = None

? ? def init_config(self, hosts, topic, user=None, pwd=None, **kwargs):

????????mechanism = kwargs.get('mechanism', 'PLAIN')
? ? ? ? protocol = kwargs.get('protocol', 'SASL_PLAINTEXT')

? ? ? ? if not user:
? ? ? ? ? ? conf = {
? ? ? ? ? ? ? ? 'bootstrap.servers': hosts
? ? ? ? ? ? }
? ? ? ? else:
? ? ? ? ? ? conf = {
? ? ? ? ? ? ? ? 'bootstrap.servers': hosts,
? ? ? ? ? ? ? ? 'sasl.mechanism': mechanism,
? ? ? ? ? ? ? ? 'security.protocol': protocol,
? ? ? ? ? ? ? ? 'sasl.username': user,
? ? ? ? ? ? ? ? 'sasl.password': pwd
? ? ? ? ? ? }
? ? ? ? self.topic = topic
? ? ? ? self.producer = RawKfkProducer(**conf)

? ? def send_msg(self, msg): # 生产者
? ? ? ? if isinstance(msg, dict):
? ? ? ? ? ? msg = json.dumps(msg)
? ? ? ? self.producer.produce(self.topic, value=msg)
? ? ? ? self.producer.flush(timeout=1)


kproducer = KfkProducer()

# 消费者
from confluent_kafka import Consumer

消费配置:

conf = {'group.id':topic_group,'auto.offset.reset':'earliest'}+普通配置

consumer = Consumer(**conf)
consumer.subscribe([topic_name])
msg = consumer.poll()
data_? = msg.value().decode()

# 基于pydantic 做数据格式校验
from pydantic import BaseModel, Field

class _SaveAnalysis(BaseModel):
? ? task_id: str = Field(...)
? ? company_list: List[str] = Field([], title='企业列表')
?
?
class SaveAnalysis(BaseModel):
? ? user: User = Field(..., title='用户信息')
? ? data: _SaveAnalysis = Field(..., title='模型参数')
?
def input(schema):
? ? """输入校验"""
? ? def decorate(f):
? ? ? ? def deco(*args, **kwargs):
? ? ? ? ? ? try:
? ? ? ? ? ? ? ? req_json = request.get_json()
? ? ? ? ? ? ? ? req_json = schema(**req_json).dict(skip_defaults=False)
? ? ? ? ? ? ? ? logger.info(f'request info: url {request.url}, json: {req_json}')
? ? ? ? ? ? except Exception as e:
? ? ? ? ? ? ? ? logger.error(f'参数解析异常:{e}')
? ? ? ? ? ? ? ? raise AppException('参数解析异常, 请检查参数')

? ? ? ? ? ? return f(req_json, *args, **kwargs)
? ? ? ? return deco
? ? return decorate

# 案例
@make_response
@input(analysis_schema.DrawAnalysis)
def draw_analysis_board(data):
? ? task_id = data['data']['task_id']
? ? user = data['user']
? ? return result.draw_analysis_board(task_id, user)

?# 统一响应

def make_response(func):
? ? """统一的响应请求"""
? ? @wraps(func)
? ? def decorate(*args, **kwargs):
? ? ? ? code, msg, data = ResponseCode.Unknown.value, '服务异常, 请联系管理人员', {} ?# 默认返回信息
? ? ? ? try:
? ? ? ? ? ? data = func(*args, **kwargs)
? ? ? ? ? ? code, msg = ResponseCode.Success.value, ''
? ? ? ? except AppException as e:
? ? ? ? ? ? logger.error(f'App内部错误: {e}')
? ? ? ? ? ? logger.error(f'错误详情: {traceback.format_exc()}')
? ? ? ? ? ? code, msg = ResponseCode.Fail.value, str(e)
? ? ? ? except Exception as e:
? ? ? ? ? ? logger.error(f'系统未知错误: {e}')
? ? ? ? ? ? logger.error(f'错误详情: {traceback.format_exc()}')
? ? ? ? finally:
? ? ? ? ? ? logger.info(f'request info: url {request.url} code: {code}')
? ? ? ? ? ? resp = {'code': code, 'msg': msg, 'data': data}
? ? ? ? ? ? return resp
? ? return decorate

# es查询封装,以及查询全部

from elasticsearch import Elasticsearch


class ElasticClient:

? ? def __init__(self):
? ? ? ? self.client = None

? ? def init_config(self, hosts, user=None, password=None):
? ? ? ? if user and password:
? ? ? ? ? ? self.client = Elasticsearch(hosts=hosts, http_auth=(user, password),timeout=500,
? ? ? ? ? ? max_retries=3,retry_on_timeout=True)
? ? ? ? else:
? ? ? ? ? ? self.client = Elasticsearch(hosts=hosts)

? ? def search(self, index, body, params):
? ? ? ? """查询数据量小于1w 的"""
? ? ? ? logger.debug(f'index: {index}, body: {body}, params: {params}')
? ? ? ? return self.client.search(index=index, body=body, params=params)

? ? def scroll_search(self, index, body, params, size=10000, scroll='5m', timeout='2m', limit=100000):
? ? ? ? """查询全量数据 或 大于1w 条数据的"""

? ? ? ? # 添加默认参数
? ? ? ? filter_path = ['hits.hits._source', '_scroll_id', 'hits.total']
? ? ? ? _params = {'track_scores': 'false', 'search_type': 'query_then_fetch'}
? ? ? ? params = {**_params, **params}

? ? ? ? def generate_resp(_scroll_id, _datas, _total, _limit):
? ? ? ? ? ? self.client.clear_scroll(scroll_id=_scroll_id)
? ? ? ? ? ? _resp = {'total': total, 'datas': _datas[:limit]}
? ? ? ? ? ? return _resp

? ? ? ? logger.debug(f'index: {index}, body: {body}, params: {params}, size: {size}, limit: {limit}')
? ? ? ? resp = self.client.search(index=index, body=body, params=params, scroll=scroll, timeout=timeout, size=size,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? filter_path=filter_path)
? ? ? ? scroll_id = resp['_scroll_id']
? ? ? ? total = resp['hits']['total']
? ? ? ? datas = [data['_source'] for data in resp.get('hits', {}).get('hits', [])]
? ? ? ? if not datas:
? ? ? ? ? ? logger.warning('no data to translate. exit')
? ? ? ? ? ? return generate_resp(scroll_id, datas, total, limit)

? ? ? ? if len(datas) >= limit:
? ? ? ? ? ? return generate_resp(scroll_id, datas, total, limit)

? ? ? ? while True:
? ? ? ? ? ? res = self.client.scroll(scroll_id=scroll_id, scroll=scroll)
? ? ? ? ? ? page_data = res.get('hits', {}).get('hits', [])
? ? ? ? ? ? if not page_data:
? ? ? ? ? ? ? ? break
? ? ? ? ? ? datas.extend([item['_source'] for item in page_data])

? ? ? ? ? ? if len(datas) >= limit:
? ? ? ? ? ? ? ? return generate_resp(scroll_id, datas, total, limit)
? ? ? ? return generate_resp(scroll_id, datas, total, limit)

? ? def count(self, index, body):
? ? ? ? logger.debug(f'index: {index}, body: {body}')
? ? ? ? resp = self.client.count(index=index, body=body)
? ? ? ? logger.debug(f'index: {index}, body: {body}, count: {resp["count"]}')
? ? ? ? return resp['count']

?# 配置信息

conf = {'host':{'host':ip'','port':端口},'user':None,'password':None}
配置时需要**

?# ?分片返回
def slice_iter(data, size=5000): ? ?? ?
????????times = len(data) // size + 1 ? ?
????????for t in range(times): ? ? ? ?
????????????????item = data[t * size: (t + 1) * size] ? ? ? ?
????????if item: ? ? ? ? ? ?
????????????????yield item

?# 动态导入

from importlib import import_model

def register_buleprint(app,blueprints):
? ? ? ? """动态蓝图注册"""
? ? ? ? for blueprint_name in blueprints:
? ? ? ? ? ? ? ? bp_module =?import_model(blueprint_name)
? ? ? ? ? ? ? ? app.register_blueprint(bp_module.bq)

  Python知识库 最新文章
Python中String模块
【Python】 14-CVS文件操作
python的panda库读写文件
使用Nordic的nrf52840实现蓝牙DFU过程
【Python学习记录】numpy数组用法整理
Python学习笔记
python字符串和列表
python如何从txt文件中解析出有效的数据
Python编程从入门到实践自学/3.1-3.2
python变量
上一篇文章      下一篇文章      查看所有文章
加:2022-03-03 16:08:24  更:2022-03-03 16:12:00 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/13 13:33:33-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码