# 根据环境变量信息去选择配置信息 env = os.getenv('ENV', 'dev') assert env in ['dev', 'st', 'uat', 'prd'] Config = locals().get(env.capitalize())
# MYSQL ?sqlalchemy from retrying import retry from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker, scoped_session from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
# 配置 ? ? Mysql = { ? ? ? ? 'url': 'mysql+pymysql://cmbuser:Cmb%402018@99.13.211.225:3306/tianji_ai', ? ? ? ? 'pool_recycle': 180, ? ? ? ? 'pool_size': 10, ? ? ? ? 'pool_timeout': 200, ? ? ? ? 'echo_pool': True, ? ? ? ? 'pool_pre_ping': True, ? ? ? ? 'pool_use_lifo': True ? ? }
class Session: ? ? def __init__(self): ? ? ? ? """延迟实例化""" ? ? ? ? self.scoped_session = None
? ? def init_config(self, *args, **config): ? ? ? ? engine = create_engine(*args, **config) ? ? ? ? session_factory = sessionmaker(bind=engine) ? ? ? ? self.scoped_session = scoped_session(session_factory)
? ? def execute_sql(self, sql): ? ? ? ? ss = self.scoped_session() ? ? ? ? try: ? ? ? ? ? ? return ss.execute(sql) ? ? ? ? finally: ? ? ? ? ? ? ss.close()
def with_db_session(func): ? ? def decorate(*args, **kwargs): ? ? ? ? try: ? ? ? ? ? ? ss = session.scoped_session() ? ? ? ? ? ? return func(*args, **kwargs, ss=ss) ? ? ? ? finally: ? ? ? ? ? ? ss.close() ? ? return decorate
# 三次尝试 @retry @retry(stop_max_attempt_number=3) def ensure_execute(ss, sql): ? ? try: ? ? ? ? resp = ss.execute(sql) ? ? ? ? ss.commit() ? ? ? ? return resp ? ? except Exception as e: ? ? ? ? ss.rollback() ? ? ? ? raise Exception('mysql 三次尝试 连接丢失')
session = Session() ?# db
ORM-使用:
class Task(Base): ????????__tablename__='tablename' ? ? ? ? id = Column(primary_key=True)
原生sql:
@with_db_session def func(ss=None): ? ? ? ? #查询 ? ? ? ? data = ss.execute(sql).first() ? ? ? ? #添加? ? ? ?? ? ? ? ? encure_execute(ss,sql)
# redis 集群
# 配置 ? ? Redis = {'startup_nodes': [{'host': '0.0.0.0', 'port': '6379'}, ? ? ? ? {'host': '0.0.0.0', 'port': '6379'}], ? ? ? ? ? ? ?'password': 'password', ? ? ? ? ? ? ?'skip_full_coverage_check': True ? ? ? ? ? ? ?}
import rediscluster
class RedisClst: ? ? def __init__(self): ? ? ? ? self.client = None
? ? def init_config(self, **config): ? ? ? ? self.client = rediscluster.RedisCluster(**config)
def acquire_lock(self, lock_key, lock_val, expire_time=100): ? ? ? ? """获取锁 ? ? ? ? """ ? ? ? ? ret = self.client.set(lock_key, lock_val, ex=expire_time, nx=True) ? ? ? ? if ret: ? ? ? ? ? ? return ret ? ? ? ? return False
? ? def release_lock(self, lock_key): ? ? ? ? """释放锁 ? ? ? ? """ ? ? ? ? self.client.delete(lock_key) ?# 直接释放锁
# 可以使用多个实例来进行不同的配置 redis = RedisClst() tj_redis = RedisClst()
# kafa 生产者配置 ? ? Kafka = { ? ? ? ? 'hosts': "", ? ? ? ? 'mechanism': 'PLAIN', ? ? ? ? 'protocol': 'SASL_PLAINTEXT', ? ? ? ? 'user': 'user', ? ? ? ? 'pwd': 'userw0rd', ? ? ? ? 'topic': 'msg-sink' ? ? }
from confluent_kafka import Producer as RawKfkProducer
class KfkProducer:
? ? def __init__(self): ? ? ? ? self.topic = None ? ? ? ? self.producer = None
? ? def init_config(self, hosts, topic, user=None, pwd=None, **kwargs):
????????mechanism = kwargs.get('mechanism', 'PLAIN') ? ? ? ? protocol = kwargs.get('protocol', 'SASL_PLAINTEXT')
? ? ? ? if not user: ? ? ? ? ? ? conf = { ? ? ? ? ? ? ? ? 'bootstrap.servers': hosts ? ? ? ? ? ? } ? ? ? ? else: ? ? ? ? ? ? conf = { ? ? ? ? ? ? ? ? 'bootstrap.servers': hosts, ? ? ? ? ? ? ? ? 'sasl.mechanism': mechanism, ? ? ? ? ? ? ? ? 'security.protocol': protocol, ? ? ? ? ? ? ? ? 'sasl.username': user, ? ? ? ? ? ? ? ? 'sasl.password': pwd ? ? ? ? ? ? } ? ? ? ? self.topic = topic ? ? ? ? self.producer = RawKfkProducer(**conf)
? ? def send_msg(self, msg): # 生产者 ? ? ? ? if isinstance(msg, dict): ? ? ? ? ? ? msg = json.dumps(msg) ? ? ? ? self.producer.produce(self.topic, value=msg) ? ? ? ? self.producer.flush(timeout=1)
kproducer = KfkProducer()
# 消费者 from confluent_kafka import Consumer
消费配置:
conf = {'group.id':topic_group,'auto.offset.reset':'earliest'}+普通配置
consumer = Consumer(**conf) consumer.subscribe([topic_name]) msg = consumer.poll() data_? = msg.value().decode()
# 基于pydantic 做数据格式校验 from pydantic import BaseModel, Field
class _SaveAnalysis(BaseModel): ? ? task_id: str = Field(...) ? ? company_list: List[str] = Field([], title='企业列表') ? ? class SaveAnalysis(BaseModel): ? ? user: User = Field(..., title='用户信息') ? ? data: _SaveAnalysis = Field(..., title='模型参数') ? def input(schema): ? ? """输入校验""" ? ? def decorate(f): ? ? ? ? def deco(*args, **kwargs): ? ? ? ? ? ? try: ? ? ? ? ? ? ? ? req_json = request.get_json() ? ? ? ? ? ? ? ? req_json = schema(**req_json).dict(skip_defaults=False) ? ? ? ? ? ? ? ? logger.info(f'request info: url {request.url}, json: {req_json}') ? ? ? ? ? ? except Exception as e: ? ? ? ? ? ? ? ? logger.error(f'参数解析异常:{e}') ? ? ? ? ? ? ? ? raise AppException('参数解析异常, 请检查参数')
? ? ? ? ? ? return f(req_json, *args, **kwargs) ? ? ? ? return deco ? ? return decorate
# 案例 @make_response @input(analysis_schema.DrawAnalysis) def draw_analysis_board(data): ? ? task_id = data['data']['task_id'] ? ? user = data['user'] ? ? return result.draw_analysis_board(task_id, user)
?# 统一响应
def make_response(func): ? ? """统一的响应请求""" ? ? @wraps(func) ? ? def decorate(*args, **kwargs): ? ? ? ? code, msg, data = ResponseCode.Unknown.value, '服务异常, 请联系管理人员', {} ?# 默认返回信息 ? ? ? ? try: ? ? ? ? ? ? data = func(*args, **kwargs) ? ? ? ? ? ? code, msg = ResponseCode.Success.value, '' ? ? ? ? except AppException as e: ? ? ? ? ? ? logger.error(f'App内部错误: {e}') ? ? ? ? ? ? logger.error(f'错误详情: {traceback.format_exc()}') ? ? ? ? ? ? code, msg = ResponseCode.Fail.value, str(e) ? ? ? ? except Exception as e: ? ? ? ? ? ? logger.error(f'系统未知错误: {e}') ? ? ? ? ? ? logger.error(f'错误详情: {traceback.format_exc()}') ? ? ? ? finally: ? ? ? ? ? ? logger.info(f'request info: url {request.url} code: {code}') ? ? ? ? ? ? resp = {'code': code, 'msg': msg, 'data': data} ? ? ? ? ? ? return resp ? ? return decorate
# es查询封装,以及查询全部
from elasticsearch import Elasticsearch
class ElasticClient:
? ? def __init__(self): ? ? ? ? self.client = None
? ? def init_config(self, hosts, user=None, password=None): ? ? ? ? if user and password: ? ? ? ? ? ? self.client = Elasticsearch(hosts=hosts, http_auth=(user, password),timeout=500, ? ? ? ? ? ? max_retries=3,retry_on_timeout=True) ? ? ? ? else: ? ? ? ? ? ? self.client = Elasticsearch(hosts=hosts)
? ? def search(self, index, body, params): ? ? ? ? """查询数据量小于1w 的""" ? ? ? ? logger.debug(f'index: {index}, body: {body}, params: {params}') ? ? ? ? return self.client.search(index=index, body=body, params=params)
? ? def scroll_search(self, index, body, params, size=10000, scroll='5m', timeout='2m', limit=100000): ? ? ? ? """查询全量数据 或 大于1w 条数据的"""
? ? ? ? # 添加默认参数 ? ? ? ? filter_path = ['hits.hits._source', '_scroll_id', 'hits.total'] ? ? ? ? _params = {'track_scores': 'false', 'search_type': 'query_then_fetch'} ? ? ? ? params = {**_params, **params}
? ? ? ? def generate_resp(_scroll_id, _datas, _total, _limit): ? ? ? ? ? ? self.client.clear_scroll(scroll_id=_scroll_id) ? ? ? ? ? ? _resp = {'total': total, 'datas': _datas[:limit]} ? ? ? ? ? ? return _resp
? ? ? ? logger.debug(f'index: {index}, body: {body}, params: {params}, size: {size}, limit: {limit}') ? ? ? ? resp = self.client.search(index=index, body=body, params=params, scroll=scroll, timeout=timeout, size=size, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? filter_path=filter_path) ? ? ? ? scroll_id = resp['_scroll_id'] ? ? ? ? total = resp['hits']['total'] ? ? ? ? datas = [data['_source'] for data in resp.get('hits', {}).get('hits', [])] ? ? ? ? if not datas: ? ? ? ? ? ? logger.warning('no data to translate. exit') ? ? ? ? ? ? return generate_resp(scroll_id, datas, total, limit)
? ? ? ? if len(datas) >= limit: ? ? ? ? ? ? return generate_resp(scroll_id, datas, total, limit)
? ? ? ? while True: ? ? ? ? ? ? res = self.client.scroll(scroll_id=scroll_id, scroll=scroll) ? ? ? ? ? ? page_data = res.get('hits', {}).get('hits', []) ? ? ? ? ? ? if not page_data: ? ? ? ? ? ? ? ? break ? ? ? ? ? ? datas.extend([item['_source'] for item in page_data])
? ? ? ? ? ? if len(datas) >= limit: ? ? ? ? ? ? ? ? return generate_resp(scroll_id, datas, total, limit) ? ? ? ? return generate_resp(scroll_id, datas, total, limit)
? ? def count(self, index, body): ? ? ? ? logger.debug(f'index: {index}, body: {body}') ? ? ? ? resp = self.client.count(index=index, body=body) ? ? ? ? logger.debug(f'index: {index}, body: {body}, count: {resp["count"]}') ? ? ? ? return resp['count']
?# 配置信息
conf = {'host':{'host':ip'','port':端口},'user':None,'password':None} 配置时需要**
?# ?分片返回 def slice_iter(data, size=5000): ? ?? ? ????????times = len(data) // size + 1 ? ? ????????for t in range(times): ? ? ? ? ????????????????item = data[t * size: (t + 1) * size] ? ? ? ? ????????if item: ? ? ? ? ? ? ????????????????yield item
?# 动态导入
from importlib import import_model
def register_buleprint(app,blueprints): ? ? ? ? """动态蓝图注册""" ? ? ? ? for blueprint_name in blueprints: ? ? ? ? ? ? ? ? bp_module =?import_model(blueprint_name) ? ? ? ? ? ? ? ? app.register_blueprint(bp_module.bq)
|