Flask模型序列化
Contents
:::tip Flask重写模型序列化 :::
使用过的序列化方式
-
在模型定义的时候给序列化的方法
-
继承 flask 里面的 JSONEncoder类以及default方法
自定义序列化方法
模型定义
class User(BaseModel, UserMixin):
__tablename__ = 'user'
id = db.Column(db.Integer, primary_key=True, autoincrement=True, unique=True)
nickname = db.Column(db.String(100))
mobile = db.Column(db.String(20), nullable=False)
email = db.Column(db.String(100))
sex = db.Column(db.Integer)
password_hash = db.Column(db.String(128), nullable=False)
status = db.Column(db.Integer)
roles = db.relationship('Role', secondary='roles_users',
backref=db.backref('users', lazy='dynamic'))
# 自定义模型序列化 user.test_schema()
def test_schema(self):
return {
'id': self.id,
'nickname': self.nickname,
'mobile': self.mobile,
"creat_time": str(self.create_time)
}
view使用
class UserView(Resource):
def __init__(self):
self.reqparse = reqparse.RequestParser()
def get(self, id):
user = User.query.filter_by(id=id).first()
# 调用定义方法
data = user.test_schema()
return jsonify(data)
重写JSONEncoder
在调用json.dumps()方法时默认会使用JSONEncoder进行序列化,传入cls参数后可以使用自定义的序列化方法,源码解析
def dumps(obj, *, skipkeys=False, ensure_ascii=True, check_circular=True,
allow_nan=True, cls=None, indent=None, separators=None,
default=None, sort_keys=False, **kw):
# cls参数的使用介绍
"""
To use a custom ``JSONEncoder`` subclass (e.g. one that overrides the
``.default()`` method to serialize additional types), specify it with
the ``cls`` kwarg; otherwise ``JSONEncoder`` is used.
如果要使用自定义的“JSONEncoder”子类
(例如,重写 default() 方法来序列化其他类型的数据),
请使用使用“cls”关键字参数;否则默认使用“JSONEncoder”。
"""
# 部分源码:传入了 cls ,就会按照重写的方法执行
if cls is None:
cls = JSONEncoder
return cls(
skipkeys=skipkeys, ensure_ascii=ensure_ascii,
check_circular=check_circular, allow_nan=allow_nan, indent=indent,
separators=separators, default=default, sort_keys=sort_keys,
**kw).encode(obj)
原有JSONEncoder的default()方法是不能够对对象进行序列化,需要重写default()方法,然后在调用json.dumps()时通过cls参数传进去
模型定义
需要在User类中定义keys和getitem方法,然后在default函数中使用dict()函数。
基本上每一个模型类都要进行序列化,所以每一个模型类里都要写 keys、__getitem__方法。把一些公共的方法提取到基类BaseModel里面去
class BaseModel(db.Model):
__abstract__ = True
create_time = db.Column(db.DateTime, default=datetime.now)
update_time = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now)
def __getitem__(self, item):
return getattr(self, item)
def keys(self):
return self.fields
# 支持隐藏字段
def hide(self, *keys):
for key in keys:
self.fields.remove(key)
return self
# 支持添加字段
def append(self, *keys):
for key in keys:
self.fields.append(key)
return self
class User(BaseModel, UserMixin):
__tablename__ = 'user'
id = db.Column(db.Integer, primary_key=True, autoincrement=True, unique=True)
nickname = db.Column(db.String(100))
mobile = db.Column(db.String(20), nullable=False)
email = db.Column(db.String(100))
sex = db.Column(db.Integer)
password_hash = db.Column(db.String(128), nullable=False)
status = db.Column(db.Integer)
roles = db.relationship('Role', secondary='roles_users',
backref=db.backref('users', lazy='dynamic'))
# SQLALChemy的实例化是不会调用__init__函数,要想调用需要
# from sqlalchemy import orm
# @orm.reconstructor这个装饰器
@orm.reconstructor
def __init__(self):
self.fields = ['id', 'nickname', 'mobile', 'create_time']
重写JSONEncoder
o 对象
from datetime import date
from flask import Flask as _Flask, current_app
from flask.json import JSONEncoder as _JSONEncoder
from application.libs.error_code import ServerError
class JSONEncoder(_JSONEncoder):
def default(self, o):
# 判断,如果对象没有这两个方法的话就返回 ServerError表示服务器内部错误
if hasattr(o, 'keys') and hasattr(o, '__getitem__'):
return dict(o)
# 兼容其他的序列化
if isinstance(o, date):
return o.strftime('%Y-%m-%d %H:%S')
# 自定义
raise ServerError()
# 用我们自定义的 JSONEncoder 替代 Flask 原有的 JSONEncoder
class Flask(_Flask):
json_encoder = JSONEncoder