Python的其他ORM(peewee)

安装

pip3 install peewee
pip3 install pymysql

http://docs.peewee-orm.com/en/latest/peewee/models.html

快速使用

import datetime
from peewee import *

# db = SqliteDatabase('my_app.db')
db = MySQLDatabase('peewee', host='127.0.0.1', user='root', password='asd123...')


class User(Model):
    username = CharField(unique=True)

    class Meta:
        database = db


class Tweet(Model):
    user = ForeignKeyField(User, backref='tweets')
    message = TextField()
    created_date = DateTimeField(default=datetime.datetime.now)
    is_published = BooleanField(default=True)

    class Meta:
        database = db


if __name__ == '__main__':
    # 生成表结构
    db.connect()
    db.create_tables([User, Tweet])
    # 直接运行生成表 修改表结构就需要到数据库中修改

打印 SQL 语句

import logging

logger = logging.getLogger("peewee")
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())

新增数据

# ...

if __name__ == '__main__':
    # ...

    # 添加
    charlie = User(username="charlie")
    charlie.save()                    # 既可以新建  也可以更新  有返回值 row 影响的行
    charlie.save(force_insert=True)   # force_insert=True 强制为更新操作
    # 对象的主键是否设置

    huey = User.create(username="huey")

查询数据

# ...

if __name__ == '__main__':
    # ...

    # 查询
    # get 方法 返回的直接的 user 对象 如果查询不到会抛出异常
    try:
         charlie = User.get(User.username == "charlie")
         charlie_id = User.get_by_id("2")
         print(charlie.username)
         print(charlie_id.username)
    except User.DoesNotExist as e:
         print("查询不到")

    # 查询所有
    users = User.select()  # ModelSelect 对象 使用时才会发起查询  用于组装 sql
    print(users.sql())
    print(type(users))

    user = users[0]
    print(type(user))  # User 对象

    usernames = ["charlie", "huey"]
    users = User.select().where(User.username.in_(usernames))
    for user in users:
        print(user.username)


    for user in User.select():
        print(user.username)

更新数据

# 方式一
charlie = User(username="charlie")  # update set xx=x where username = "charlie"
print(charlie.save())

# 方式二
print(User.update(age=20).where(User.username == "charlie").execute())

删除数据

# 方式一
user = User.get(User.username == "charlie")
user.delete_instance()

# 方式二
query = User.delete().where(User.username == "huey").execute()
print(query)  # 影响的行数

定义表

import datetime
from peewee import *
import logging

logger = logging.getLogger("peewee")
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())

db = MySQLDatabase('peewee', host='127.0.0.1', user='root', password='asd123...')


class BaseModel(Model):
    add_time = DateTimeField(default=datetime.datetime.now, verbose_name="添加时间")

    class Meta:
        database = db  # 数据库连接


class Person(BaseModel):
    name = CharField(max_length=10, null=False, index=True, verbose_name="姓名")
    passwd = CharField(max_length=20, null=False, default='123456', verbose_name="密码")
    email = CharField(max_length=50, null=True, unique=True, verbose_name="邮箱")
    gender = IntegerField(null=True, default=1, verbose_name="性别")
    birthday = DateField(null=True, default=None, verbose_name="生日")
    is_admin = BooleanField(default=True, verbose_name="是否是管理员")

    class Meta:
        table_name = 'persons'  # 自定义表名


if __name__ == '__main__':
    db.connect()
    db.create_tables([Person, ])

主键约束

import datetime

from peewee import *
import logging

logger = logging.getLogger("peewee")
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())

db = MySQLDatabase('peewee', host='127.0.0.1', user='root', password='asd123...')


class BaseModel(Model):
    add_time = DateTimeField(default=datetime.datetime.now, verbose_name="添加时间")

    class Meta:
        database = db  # 数据库连接


class Person(BaseModel):
    first = CharField()
    last = CharField()

    class Meta:
        primary_key = CompositeKey('first', 'last')


class Pet(BaseModel):
    owner_first = CharField()
    owner_last = CharField()
    pet_name = CharField()

    class Meta:
        constraints = [SQL('FOREIGN KEY(owner_first,owner_last) REFERENCES person(first,last)')]


class Blog(BaseModel):
    pass


class Tag(BaseModel):
    pass


# 复合主键
class BlogToTag(BaseModel):
    blog = ForeignKeyField(Blog)
    tag = ForeignKeyField(Tag)

    class Meta:
        primary_key = CompositeKey('blog', 'tag')


if __name__ == '__main__':
    db.connect()
    db.create_tables([Person, Pet, Tag, Blog, BlogToTag])

多数据插入

p_id = Person.insert({
    'first': 'liu',
    'last': 'jin'
}).execute()
print(p_id)

id = Blog.insert({}).execute()  # add_time 字段此时
print(id)

blog = Blog()
print(blog.id)
blog.save()
print(blog.id)   # add_time 就有当前时间了 自动设置


for data_dict in data_source:
    Model.create(**data_dict)

# 性能高的方法
blogs = [
    {"add_time": datetime.datetime.now()},
    {"add_time": datetime.datetime.now()},
]
query = Blog.insert_many(blogs).execute()
print(query)

复合条件查询

query1 = Person.select().where((Person.name == "fff") | (Person.name == "xxx"))
query1 = Person.select().where((Person.name == "fff") & (Person.is_relative == True))


person = Person.select().where((Person.first == "liu") | (Person.first == "liu1"))
print(type(person))  # ModelSelect 对象
for p in person:
    print(p.last)

模糊查询

query = Person.select().where(Person.first.contains("liu"))

# 更多的方法 http://docs.peewee-orm.com/en/latest/peewee/query_operators.html

# query = Person.select().where(Person.first.startswith("liu"))
# print(query)

for q in query:
    print(q.first)

limit、排序、去重

# limit
query = Person.select().limit(2)
for row in query:
    print(row)

# 排序
query = Person.select().order_by(Person.add_time.desc() )  # desc 降序 默认是升序
# query = Person.select().order_by(-Person.add_time)   # 降序
for r in query:
    print(r.add_time)

# 去重
query = Person.select(Person.first).distinct()
for q in query:
    print(q.first)

# 去重后计数
query = Person.select(Person.first).distinct().count()
print(query)

聚合函数