config.py:
SQLALCHEMY_BINDS = {
"db1": "mysql+pymysql://db_user1:db_pwd1@db_host1:db_port1/db_name1?charset=utf8",
"db2": "mysql+pymysql://db_user2:db_pwd2@db_host2:db_port2/db_name2?charset=utf8",
}
models.py:
def dynamic_model(bind_key, model_n):
properties = {
"__tablename__" = model_n.__tablename__,
"__bind_key__" = bind_key
}
for col in dir(model_n):
if col.startswith('_'):
continue
properties[col] = getattr(model_n, col)
model = type(model_n.__tablename__, (db.Model,), properties)
return model
自定义序列化:
class Serialize:
def __init__(self, model):
self.query = model.query
self.model = model
self.columns = []
self.page_size = request.args.get('page_size', type=int, default=10)
self.current_page = request.args.get('page', default=1, type=int)
self.start = (self.current_page - 1) * self.page_size
self.end = self.start + self.page_size
self.field_func_map = {}
def field(self, *columns):
self.columns = columns
self.query = db.session.query(*[getattr(self.model, column) for column in columns])
return self
def search(self, **kw_map):
for column in kw_map:
if kw_map[column]:
self.query = self.query.filter(
getattr(self.model, column).like("%{}%".format('/' + kw_map[column]), escape='/'))
return self
def exclude(self, **kw_map):
for column in kw_map:
self.query = self.query.filter(
getattr(self.model, column) != kw_map[column])
return self
def custom_handle(self, **kargs):
self.field_func_map = kargs
return self
def to_json(self, **kw_map):
res = []
for column in kw_map:
self.query = self.query.filter(
getattr(self.model, column) == kw_map[column])
if self.columns:
for q in self.query.slice(self.start, self.end):
obj_dict = {}
for i, c in enumerate(self.columns):
if c in self.field_func_map:
obj_dict[c] = self.field_func_map[c.name](q[i])
else:
obj_dict[c] = q[i]
res.append(obj_dict)
else:
for q in self.query.slice(self.start, self.end):
obj_dict = {}
for c in class_mapper(q.__class__).columns:
if c.name in self.field_func_map:
obj_dict[c.name] = self.field_func_map[c.name](getattr(q, c.name))
else:
obj_dict[c.name] = getattr(q, c.name)
res.append(obj_dict)
return res
def total(self):
return self.query.count()
def data(self, **kw_map):
return {
'data': self.to_json(**kw_map),
'total': self.total(),
'currentPage': self.current_page,
'pageSize': self.page_size
}
|