前言
很多人对这个orm框架有很大的误区 以为会一个orm框架就不用去深入sql语句了 这会导致sql不会写 查询的时候实现困难 适应别的语言的orm框架还得再学一遍。 所以说sql一定一定要学好,这样查询实现的原理就会顺通,学习其他语言的orm框架就会快很多
ORM(Object Relational Mapping,对象关系映射),在Python下?有很多这样的类库,如 SQLObject、Storm、peewee和SQLAlchemy。
介绍?下Peewee的基本使?,因为它?常的轻量级,最主要是和Django的ORM 操作很像,如果你学过Django那么很容易上?。
一、peewee的安装和入门
peewee官方文档:http://docs.peewee-orm.com/en/latest/peewee/models.html
安装
pip install peewee
pip install pymysql
1.字段类型表&Meta类型表&类型属性表
字段类型表
可以看到对应关系 比如IntegerField 在mysql中是integer
Meta类型表
类型属性表
2.设计表结构
import datetime
from peewee import *
import logging
logger = logging.getLogger("peewee")
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())
db = MySQLDatabase('peewee', host='127.0.0.1', user='root', passwd='******')
class User(Model):
username = CharField(primary_key=True, max_length=20)
age = CharField(default=18, max_length=20, verbose_name="年龄")
class Meta:
database = db
class Tweet(Model):
user = ForeignKeyField(User, backref='tweets')
message = TextField()
created_date = DateTimeField(default=datetime.datetime.now)
is_published = BooleanField(default=True)
class Meta:
database = db
防止连接丢失
如果是在项目中,可能会因为一个语句报错而丢失连接 而导致项目崩溃
from playhouse.shortcuts import ReconnectMixin
from playhouse.pool import PooledMySQLDatabase
class ReconnectMySQLDatabase(ReconnectMixin, PooledMySQLDatabase):
pass
db = ReconnectMySQLDatabase("mxshop_goods_srv", host="localhost", port=3306, user="root", password="123456")
二、表的设计&操作
0.继承
在设计表的时候可以设计一个BaseModel来继承它
class BaseModel(Model):
add_time = DateTimeField(default=datetime.datetime.now, verbose_name="添加时间")
class Meta:
database = db
class Person(BaseModel):
first = CharField()
1.添加
生成表
if __name__ == "__main__":
db.connect()
db.create_tables([User, Tweet])
原生SQL
query = User.raw('SELECT * FROM new_user WHERE username = %s', "bobby5")
query = User.select().where(SQL('username = "%s"' % "bobby5"))
for q in query:
print(q.username, q.age)
添加
if __name__ == "__main__":
huey = User.create(username="huey")
charlie = User(username="charlie")
rows = charlie.save()
if rows == 0:
print("未更新数据")
Person.insert({
'first': 'li3',
'last': 'bobby3'
}).execute()
2.查询
try:
charlie = User.get_by_id("charie")
print(charlie.username)
except User.DoesNotExist as e:
print("查询不到")
users = User.select()
print(type(users))
print(type(user))
usernames = ["charlie", "huey", "mickey"]
users = User.select().where(User.username.in_(usernames))
for user in users:
print(user.username)
for user in User.select():
print(user.username)
3.更新
charlie = User(username="charlie")
print(charlie.save())
print(User.update(age=20).where(User.username=="charlie").execute())
4.删除
user = User.get(User.username=="huey")
user.delete_instance()
5.约束
class Person(BaseModel):
first = CharField()
last = CharField()
class Meta:
primary_key = CompositeKey('first', 'last')
class Pet(BaseModel):
owner_first = CharField()
owner_last = CharField()
pet_name = CharField()
class Meta:
constraints = [SQL('FOREIGN KEY(owner_first, owner_last) REFERENCES person(first, last)')]
6.复合主键
class Blog(BaseModel):
pass
class Tag(BaseModel):
pass
class BlogToTag(BaseModel):
"""A simple "through" table for many-to-many relationship."""
blog = ForeignKeyField(Blog)
tag = ForeignKeyField(Tag)
class Meta:
primary_key = CompositeKey('blog', 'tag')
三、详细操作
1.like&排序&dict
query = Person.select().where(Person.first.startswith('bo'))
users = User.select().order_by(-User.age)
for user in users:
print(user.username, user.age)
person = Person.select().order_by(Person.id.desc())
for row in person:
print(row)
query = Person.select().dicts()
for row in query:
print(type(row))
print(row)
2.去重&统计
query = User.select(User.username).distinct().count()
print(query)
for q in query:
print(q.username)
print(query)
"""
何时使用one():
如果您有一个应该返回 1 个结果的查询,否则会引发异常——即使它返回 0 个结果。换句话说,它不允许 empty results.
何时使用 scalar():
如果您有一个返回 1 个结果或没有结果的查询。否则抛出异常。换句话说,它确实允许 empty results.
"""
max_age = User.select(fn.MAX(User.age)).scalar()
users = User.select().where(User.age==max_age)
for user in users:
print(user.username)
sub_query = User.select(fn.MAX(User.age))
query = User.select(User.username).where(User.age==sub_query)
for q in query:
print(q.username)
query = User.raw('SELECT * FROM new_user WHERE username = %s', "bobby5")
query = User.select().where(SQL('username = "%s"' % "bobby5"))
for q in query:
print(q.username, q.age)
四、避免n+1查询
query = Tweet.select(Tweet, User.username).join(User).where(User.username == "mickey")
for q in query:
print(q.user.username, q.content)
|