Mysql分布式设计
作用
- 对数据备份, 实现高可用 HA (主要)
- 通过读写分离, 提高吞吐量, 实现高性能
原理
- Mysql的复制 是一个异步的复制过程
- 过程本质为 Slave 从 Master 端获取 Binary Log, 然后再在自己身上完全顺序的执行日志中所记录的各种操作
- MySQL 复制的基本过程如下:
1)Slave 上面的 IO 线程连接上 Master, 并请求从指定日志文件的指定位置之后的日志内容; 2)Master 接收到来自 Slave 的 IO 线程的请求后, 通过负责复制的IO线程 根据请求信息读取日志信息,返回给 Slave 端的 IO 线程。 3)Slave 的 IO 线程接收到信息后,将接收到的日志内容依次写入到 Slave 端的 Relay Log文件 4)Slave 的 SQL 线程检测到 Relay Log 中新增加了内容后,会马上解析该文件中的内容, 并在自身执行这些 原始SQL语句。
注: I/O thread从Binary log中读取到的数据并不是读一条执行一条而是先写入到Relay log中,否则效率会很慢
常用架构
主从架构
- 性能
一主多从, 读写分离, 提高吞吐量 - 可用性
主库单点, 一旦挂了, 无法写入 从库高可用
主备架构
- 性能
单库读写, 性能一般 - 可用性
高可用, 一旦主库挂了, 就启用备库 - 这种方案被阿里云、美团等企业广泛使用
主备架构搭建除了配置双主同步, 还需要搭配第三方故障转移/高可用方案, 属于DBA和运维专业领域
MySQL + Keepalived 双主热备高可用操作记录 我们通常说的双机热备是指两台机器都在运行,但并不是两台机器都同时在提供服务。当提供服务的一台出现故障的时候,另外一台会马上自动接管并且提供服务,而且切换的时间非常短。MySQL双主复制,即互为Master-Slave(只有一个Master提供写操作),可以实现数据库服务器的热备,但是一个Master宕机后不能实现动态切换。使用Keepalived,可以通过虚拟IP,实现双主对外的统一接口以及自动检查、失败切换机制,从而实现MySQL数据库的高可用方案
1)先实施Master->Slave的主主同步。主主是数据双向同步,主从是数据单向同步。一般情况下,主库宕机后,需要手动将连接切换到从库上。(但是用keepalived就可以自动切换)
2)再结合Keepalived的使用,通过VIP实现Mysql双主对外连接的统一接口。即客户端通过Vip连接数据库;当其中一台宕机后,VIP会漂移到另一台上,这个过程对于客户端的数据连接来说几乎无感觉,从而实现高可用。
如果我们基于代码层面而不考虑去安装部署keepalive,只需要在配置访问数据库地址时设置为VIP虚拟IP即可
问题: 既然主备互为备份, 为什么不采用双主方案, 提供两台主进行负载均衡?
- 原因是为了避免数据的冲突,防止造成数据的不一致性。 虽然在两边执行的修改有先后顺序,但由于 Replication 是异步的实现机制,同样可能会导致 晚做的修改被早做的修改所覆盖
高可用复合架构
- 性能
读写分离, 提高吞吐量 - 可用性
高可用, 一旦主库挂了, 就启用备库
读写分离
- sqlalchemy 并没有像 django-orm 一样内置完善的读写分离方案, 但是提供了可以自定义的接口: 官方参考文档, 我们可以借此对 flask-sqlalchemy 进行二次开发, 实现读写分离
engines = {
'leader':create_engine("sqlite:///leader.db"),
'other':create_engine("sqlite:///other.db"),
'follower1':create_engine("sqlite:///follower1.db"),
'follower2':create_engine("sqlite:///follower2.db"),
}
from sqlalchemy.sql import Update, Delete
from sqlalchemy.orm import Session, sessionmaker
import random
class RoutingSession(Session):
def get_bind(self, mapper=None, clause=None):
if mapper and issubclass(mapper.class_, MyOtherClass):
return engines['other']
elif self._flushing or isinstance(clause, (Update, Delete)):
return engines['leader']
else:
return engines[
random.choice(['follower1','follower2'])
]
# 通过class_ 这个属性指定自定义的session类
Session = sessionmaker(class_=RoutingSession)
基本实现思路: 实现自定义的 session类, 继承 SignallingSession类
- 重写 get_bind方法, 根据读写需求选择对应的数据库地址
实现自定义的 SQLAlchemy类, 继承 SQLAlchemy类
- 重写 create_session方法, 在内部使用自定义的 Session类
import random
from flask import Flask
from flask_sqlalchemy import SQLAlchemy, SignallingSession, get_state
from sqlalchemy import orm
from sqlalchemy.sql.dml import UpdateBase
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://root:mysql@192.168.105.140:3306/test31'
app.config['SQLALCHEMY_BINDS'] = {
'master': 'mysql://root:mysql@192.168.105.140:3306/test31',
'slave1': 'mysql://root:mysql@192.168.105.140:8306/test31',
'slave2': 'mysql://root:mysql@192.168.105.140:3306/test31'
}
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_ECHO'] = True
class RoutingSession(SignallingSession):
def __init__(self, db, autocommit=False, autoflush=True, **options):
super(RoutingSession, self).__init__(db, autocommit, autoflush, **options)
self.slave = random.choice(['slave1', 'slave2'])
def get_bind(self, mapper=None, clause=None):
"""每次数据库操作(增删改查及事务操作)都会调用该方法, 来获取对应的数据库引擎(访问的数据库)"""
state = get_state(self.app)
if mapper is not None:
try:
persist_selectable = mapper.persist_selectable
except AttributeError:
persist_selectable = mapper.mapped_table
info = getattr(persist_selectable, 'info', {})
bind_key = info.get('bind_key')
if bind_key is not None:
return state.db.get_engine(self.app, bind=bind_key)
if self._flushing or isinstance(clause, UpdateBase):
print('写操作')
return state.db.get_engine(self.app, bind='master')
else:
print('读操作: ', self.slave)
return state.db.get_engine(self.app, bind=self.slave)
class RoutingSQLAlchemy(SQLAlchemy):
def create_session(self, options):
return orm.sessionmaker(class_=RoutingSession, db=self, **options)
db = RoutingSQLAlchemy(app)
class User(db.Model):
__tablename__ = 't_user'
id = db.Column(db.Integer, primary_key=True)
name = db.Column('username', db.String(20), unique=True)
age = db.Column(db.Integer, default=0, index=True)
@app.route('/')
def index():
"""增加数据"""
print('---读-----------')
users = User.query.all()
for user in users:
print(user.id, user.name, user.age)
print('---读-----------')
users = User.query.all()
for user in users:
print(user.id, user.name, user.age)
return "index"
if __name__ == '__main__':
db.drop_all()
db.create_all()
app.run(debug=True, host='0.0.0.0')
项目集成
- 将工具包routing_db 导入 common/models中 , 其中的 routing_sqlalchemy.py文件实现了读写分离
import random
from flask_sqlalchemy import SQLAlchemy, SignallingSession, get_state
from sqlalchemy import orm
from sqlalchemy.sql.dml import UpdateBase
class RoutingSession(SignallingSession):
"""自定义Session类, 继承SignallingSession"""
def __init__(self, db, autocommit=False, autoflush=True, **options):
super(RoutingSession, self).__init__(db, autocommit, autoflush, **options)
self.slave = random.choice(['slave1', 'slave2'])
def get_bind(self, mapper=None, clause=None):
"""每次数据库操作(增删改查及事务操作)都会调用该方法, 来获取对应的数据库引擎(访问的数据库)"""
state = get_state(self.app)
if mapper is not None:
try:
persist_selectable = mapper.persist_selectable
except AttributeError:
persist_selectable = mapper.mapped_table
info = getattr(persist_selectable, 'info', {})
bind_key = info.get('bind_key')
if bind_key is not None:
return state.db.get_engine(self.app, bind=bind_key)
if self._flushing or isinstance(clause, UpdateBase):
print('写操作')
return state.db.get_engine(self.app, bind='master')
else:
print('读操作: ', self.slave)
return state.db.get_engine(self.app, bind=self.slave)
class RoutingSQLAlchemy(SQLAlchemy):
"""自定义SQLALchemy类"""
def create_session(self, options):
"""重写create_session方法: 使用自定义Session类"""
return orm.sessionmaker(class_=RoutingSession, db=self, **options)
- 在 app/settings/config.py文件中 设置主从数据库的URI地址
class DefaultConfig:
"""默认配置"""
...
SQLALCHEMY_BINDS = {
"master": 'mysql://root:mysql@192.168.105.140:3306/hm_topnews',
"slave1": 'mysql://root:mysql@192.168.105.140:3306/hm_topnews',
"slave2": 'mysql://root:mysql@192.168.105.140:8306/hm_topnews'
}
...
- 在 app/init.py文件 中使用自定义SQLAlchemy类
...
from models.routing_db.routing_sqlalchemy import RoutingSQLAlchemy
db = RoutingSQLAlchemy()
...
优化 修改前
class LoginResource(Resource):
"""注册登录"""
def post(self):
parser = RequestParser()
parser.add_argument('mobile', required=True, location='json', type=mobile_type)
parser.add_argument('code', required=True, location='json', type=regex(r'^\d{6}$'))
args = parser.parse_args()
mobile = args.mobile
code = args.code
key = 'app:code:{}'.format(mobile)
real_code = redis_client.get(key)
if not real_code or real_code != code:
return {'message': 'Invalid Code', 'data': None}, 400
user = User.query.options(load_only(User.id)).filter(User.mobile == mobile).first()
if user:
user.last_login = datetime.now()
else:
user = User(mobile=mobile, name=mobile, last_login=datetime.now())
db.session.add(user)
db.session.commit()
token = generate_jwt({'userid': user.id},
expiry=datetime.utcnow() + timedelta(days=current_app.config['JWT_EXPIRE_DAYS']))
return {'token': token}, 201
修改后
class LoginResource(Resource):
"""注册登录"""
def post(self):
parser = RequestParser()
parser.add_argument('mobile', required=True, location='json', type=mobile_type)
parser.add_argument('code', required=True, location='json', type=regex(r'^\d{6}$'))
args = parser.parse_args()
mobile = args.mobile
code = args.code
key = 'app:code:{}'.format(mobile)
real_code = redis_client.get(key)
if not real_code or real_code != code:
return {'message': 'Invalid Code', 'data': None}, 400
user = User.query.options(load_only(User.id)).filter(User.mobile == mobile).first()
if user:
user.last_login = datetime.now()
else:
user = User(mobile=mobile, name=mobile, last_login=datetime.now())
db.session.add(user)
db.session.flush()
userid = user.id
db.session.commit()
token = generate_jwt({'userid': userid},
expiry=datetime.utcnow() + timedelta(days=current_app.config['JWT_EXPIRE_DAYS']))
return {'token': token}, 201
|