[Python知识库]flask 外部连接代码片段

# 根据环境变量信息去选择配置信息
env = os.getenv('ENV', 'dev')
assert env in ['dev', 'st', 'uat', 'prd']
Config = locals().get(env.capitalize())

# MYSQL ?sqlalchemy
from retrying import retry
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

# 配置
? ? Mysql = {
? ? ? ? 'url': 'mysql+pymysql://cmbuser:Cmb%402018@',
? ? ? ? 'pool_recycle': 180,
? ? ? ? 'pool_size': 10,
? ? ? ? 'pool_timeout': 200,
? ? ? ? 'echo_pool': True,
? ? ? ? 'pool_pre_ping': True,
? ? ? ? 'pool_use_lifo': True
? ? }

class Session:
? ? def __init__(self):
? ? ? ? """延迟实例化"""
? ? ? ? self.scoped_session = None

? ? def init_config(self, *args, **config):
? ? ? ? engine = create_engine(*args, **config)
? ? ? ? session_factory = sessionmaker(bind=engine)
? ? ? ? self.scoped_session = scoped_session(session_factory)

? ? def execute_sql(self, sql):
? ? ? ? ss = self.scoped_session()
? ? ? ? try:
? ? ? ? ? ? return ss.execute(sql)
? ? ? ? finally:
? ? ? ? ? ? ss.close()

def with_db_session(func):
? ? def decorate(*args, **kwargs):
? ? ? ? try:
? ? ? ? ? ? ss = session.scoped_session()
? ? ? ? ? ? return func(*args, **kwargs, ss=ss)
? ? ? ? finally:
? ? ? ? ? ? ss.close()
? ? return decorate

# 三次尝试 @retry
def ensure_execute(ss, sql):
? ? try:
? ? ? ? resp = ss.execute(sql)
? ? ? ? ss.commit()
? ? ? ? return resp
? ? except Exception as e:
? ? ? ? ss.rollback()
? ? ? ? raise Exception('mysql 三次尝试 连接丢失')

session = Session() ?# db


class Task(Base):
? ? ? ? id = Column(primary_key=True)


def func(ss=None):
? ? ? ? #查询
? ? ? ? data = ss.execute(sql).first()
? ? ? ? #添加? ? ? ??
? ? ? ? encure_execute(ss,sql)

# redis 集群

# 配置
? ? Redis = {'startup_nodes': [{'host': '', 'port': '6379'},
? ? ? ? {'host': '', 'port': '6379'}],
? ? ? ? ? ? ?'password': 'password',
? ? ? ? ? ? ?'skip_full_coverage_check': True
? ? ? ? ? ? ?}

import rediscluster

class RedisClst:
? ? def __init__(self):
? ? ? ? self.client = None

? ? def init_config(self, **config):
? ? ? ? self.client = rediscluster.RedisCluster(**config)

def acquire_lock(self, lock_key, lock_val, expire_time=100):
? ? ? ? """获取锁
? ? ? ? """
? ? ? ? ret = self.client.set(lock_key, lock_val, ex=expire_time, nx=True)
? ? ? ? if ret:
? ? ? ? ? ? return ret
? ? ? ? return False

? ? def release_lock(self, lock_key):
? ? ? ? """释放锁
? ? ? ? """
? ? ? ? self.client.delete(lock_key) ?# 直接释放锁

# 可以使用多个实例来进行不同的配置
redis = RedisClst()
tj_redis = RedisClst()

# kafa 生产者配置
? ? Kafka = {
? ? ? ? 'hosts': "",
? ? ? ? 'mechanism': 'PLAIN',
? ? ? ? 'protocol': 'SASL_PLAINTEXT',
? ? ? ? 'user': 'user',
? ? ? ? 'pwd': 'userw0rd',
? ? ? ? 'topic': 'msg-sink'
? ? }

from confluent_kafka import Producer as RawKfkProducer

class KfkProducer:

? ? def __init__(self):
? ? ? ? self.topic = None
? ? ? ? self.producer = None

? ? def init_config(self, hosts, topic, user=None, pwd=None, **kwargs):

????????mechanism = kwargs.get('mechanism', 'PLAIN')
? ? ? ? protocol = kwargs.get('protocol', 'SASL_PLAINTEXT')

? ? ? ? if not user:
? ? ? ? ? ? conf = {
? ? ? ? ? ? ? ? 'bootstrap.servers': hosts
? ? ? ? ? ? }
? ? ? ? else:
? ? ? ? ? ? conf = {
? ? ? ? ? ? ? ? 'bootstrap.servers': hosts,
? ? ? ? ? ? ? ? 'sasl.mechanism': mechanism,
? ? ? ? ? ? ? ? 'security.protocol': protocol,
? ? ? ? ? ? ? ? 'sasl.username': user,
? ? ? ? ? ? ? ? 'sasl.password': pwd
? ? ? ? ? ? }
? ? ? ? self.topic = topic
? ? ? ? self.producer = RawKfkProducer(**conf)

? ? def send_msg(self, msg): # 生产者
? ? ? ? if isinstance(msg, dict):
? ? ? ? ? ? msg = json.dumps(msg)
? ? ? ? self.producer.produce(self.topic, value=msg)
? ? ? ? self.producer.flush(timeout=1)

kproducer = KfkProducer()

# 消费者
from confluent_kafka import Consumer


conf = {'':topic_group,'auto.offset.reset':'earliest'}+普通配置

consumer = Consumer(**conf)
msg = consumer.poll()
data_? = msg.value().decode()

# 基于pydantic 做数据格式校验
from pydantic import BaseModel, Field

class _SaveAnalysis(BaseModel):
? ? task_id: str = Field(...)
? ? company_list: List[str] = Field([], title='企业列表')
class SaveAnalysis(BaseModel):
? ? user: User = Field(..., title='用户信息')
? ? data: _SaveAnalysis = Field(..., title='模型参数')
def input(schema):
? ? """输入校验"""
? ? def decorate(f):
? ? ? ? def deco(*args, **kwargs):
? ? ? ? ? ? try:
? ? ? ? ? ? ? ? req_json = request.get_json()
? ? ? ? ? ? ? ? req_json = schema(**req_json).dict(skip_defaults=False)
? ? ? ? ? ? ? ?'request info: url {request.url}, json: {req_json}')
? ? ? ? ? ? except Exception as e:
? ? ? ? ? ? ? ? logger.error(f'参数解析异常:{e}')
? ? ? ? ? ? ? ? raise AppException('参数解析异常, 请检查参数')

? ? ? ? ? ? return f(req_json, *args, **kwargs)
? ? ? ? return deco
? ? return decorate

# 案例
def draw_analysis_board(data):
? ? task_id = data['data']['task_id']
? ? user = data['user']
? ? return result.draw_analysis_board(task_id, user)

?# 统一响应

def make_response(func):
? ? """统一的响应请求"""
? ? @wraps(func)
? ? def decorate(*args, **kwargs):
? ? ? ? code, msg, data = ResponseCode.Unknown.value, '服务异常, 请联系管理人员', {} ?# 默认返回信息
? ? ? ? try:
? ? ? ? ? ? data = func(*args, **kwargs)
? ? ? ? ? ? code, msg = ResponseCode.Success.value, ''
? ? ? ? except AppException as e:
? ? ? ? ? ? logger.error(f'App内部错误: {e}')
? ? ? ? ? ? logger.error(f'错误详情: {traceback.format_exc()}')
? ? ? ? ? ? code, msg = ResponseCode.Fail.value, str(e)
? ? ? ? except Exception as e:
? ? ? ? ? ? logger.error(f'系统未知错误: {e}')
? ? ? ? ? ? logger.error(f'错误详情: {traceback.format_exc()}')
? ? ? ? finally:
? ? ? ? ? ?'request info: url {request.url} code: {code}')
? ? ? ? ? ? resp = {'code': code, 'msg': msg, 'data': data}
? ? ? ? ? ? return resp
? ? return decorate

# es查询封装,以及查询全部

from elasticsearch import Elasticsearch

class ElasticClient:

? ? def __init__(self):
? ? ? ? self.client = None

? ? def init_config(self, hosts, user=None, password=None):
? ? ? ? if user and password:
? ? ? ? ? ? self.client = Elasticsearch(hosts=hosts, http_auth=(user, password),timeout=500,
? ? ? ? ? ? max_retries=3,retry_on_timeout=True)
? ? ? ? else:
? ? ? ? ? ? self.client = Elasticsearch(hosts=hosts)

? ? def search(self, index, body, params):
? ? ? ? """查询数据量小于1w 的"""
? ? ? ? logger.debug(f'index: {index}, body: {body}, params: {params}')
? ? ? ? return, body=body, params=params)

? ? def scroll_search(self, index, body, params, size=10000, scroll='5m', timeout='2m', limit=100000):
? ? ? ? """查询全量数据 或 大于1w 条数据的"""

? ? ? ? # 添加默认参数
? ? ? ? filter_path = ['hits.hits._source', '_scroll_id', '']
? ? ? ? _params = {'track_scores': 'false', 'search_type': 'query_then_fetch'}
? ? ? ? params = {**_params, **params}

? ? ? ? def generate_resp(_scroll_id, _datas, _total, _limit):
? ? ? ? ? ? self.client.clear_scroll(scroll_id=_scroll_id)
? ? ? ? ? ? _resp = {'total': total, 'datas': _datas[:limit]}
? ? ? ? ? ? return _resp

? ? ? ? logger.debug(f'index: {index}, body: {body}, params: {params}, size: {size}, limit: {limit}')
? ? ? ? resp =, body=body, params=params, scroll=scroll, timeout=timeout, size=size,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? filter_path=filter_path)
? ? ? ? scroll_id = resp['_scroll_id']
? ? ? ? total = resp['hits']['total']
? ? ? ? datas = [data['_source'] for data in resp.get('hits', {}).get('hits', [])]
? ? ? ? if not datas:
? ? ? ? ? ? logger.warning('no data to translate. exit')
? ? ? ? ? ? return generate_resp(scroll_id, datas, total, limit)

? ? ? ? if len(datas) >= limit:
? ? ? ? ? ? return generate_resp(scroll_id, datas, total, limit)

? ? ? ? while True:
? ? ? ? ? ? res = self.client.scroll(scroll_id=scroll_id, scroll=scroll)
? ? ? ? ? ? page_data = res.get('hits', {}).get('hits', [])
? ? ? ? ? ? if not page_data:
? ? ? ? ? ? ? ? break
? ? ? ? ? ? datas.extend([item['_source'] for item in page_data])

? ? ? ? ? ? if len(datas) >= limit:
? ? ? ? ? ? ? ? return generate_resp(scroll_id, datas, total, limit)
? ? ? ? return generate_resp(scroll_id, datas, total, limit)

? ? def count(self, index, body):
? ? ? ? logger.debug(f'index: {index}, body: {body}')
? ? ? ? resp = self.client.count(index=index, body=body)
? ? ? ? logger.debug(f'index: {index}, body: {body}, count: {resp["count"]}')
? ? ? ? return resp['count']

?# 配置信息

conf = {'host':{'host':ip'','port':端口},'user':None,'password':None}

?# ?分片返回
def slice_iter(data, size=5000): ? ?? ?
????????times = len(data) // size + 1 ? ?
????????for t in range(times): ? ? ? ?
????????????????item = data[t * size: (t + 1) * size] ? ? ? ?
????????if item: ? ? ? ? ? ?
????????????????yield item

?# 动态导入

from importlib import import_model

def register_buleprint(app,blueprints):
? ? ? ? """动态蓝图注册"""
? ? ? ? for blueprint_name in blueprints:
? ? ? ? ? ? ? ? bp_module =?import_model(blueprint_name)
? ? ? ? ? ? ? ? app.register_blueprint(

【Python】 14-CVS文件操作
加:2022-03-03 16:08:24  更:2022-03-03 16:12:00 
