今天的文章图片就是这个知乎 Live 全文搜索的效果了。如果在 wifi 情况下或者土豪不介意流量的同学可以直接 感受实际使用的动态效果

在我的《Python Web 开发实战》一书中,比较少的介绍到豆瓣自己造的轮子。有读者喷「不够实战」。我显然承认,也很苦闷 ,今天顺便来吐个槽。我刚工作的时候特别愿意混各种技术会议和活动。经常有一些专家在上面讲:我们自己实现了一个 XXX,它有如下多的特性,现在支持了多少个产品线的多少个应用,每天的数据量 YY PB,流量 ZZ...

通篇在讲架构,摆几张高大上的图,甚至能说几个大家不了解的新的玩法都很少,还不断的问坐在下面的领导或者法务:额,这个我能说嘛;那个我能分享么?最最重要的是,他们讲的这些东西大多不是开源的.... 也基本没有一个可以拿得出手的论文,甚至说白了,如果你正好专注这一部分,会发现它也是根据了 FLAG 公司的论文在造轮子罢了,说不定造的还不如你。

听过之后,也没有收获,都是「别人家」的,甚至有种炫耀的感觉而已,至于有什么苦可能只有他们自己知道吧。我就越来越不愿意参加这种会议了。

写书或者博客也是这样。一切东西脱离了公司能提供的基础设施和环境都是空谈,但是这些铺垫说起来就太大了,先不讨论公司有没有授权你在外面说,就是没开源这点就不好弄。说的人都是在虚化的讲一大坨的东西,最多来几个截图之类的(应该还打了马赛克)。等这一大坨东西说的让大家明白了,一本书的厚度肯定不够。但是这些内容呢,只是你为了写书的某一 (几) 章做铺垫而已,有些内容太专太偏,读者大部分场景下是用不到的,好吧,又得骂娘说你这本书不实用...

今天我将基于我过往的实践,以及最近学习 asyncio 和 ES 知识完成一个小程序的 API 服务。看这篇文章前推荐阅读相关的如下文章:

  1. 知乎 Live 全文搜索之模型设计和爬虫实现
  2. 知乎 Live 全文搜索之模型接口
  3. 使用 Python 进行并发编程 - 我为什么不喜欢 Gevent
  4. 使用 Python 进行并发编程 - asyncio 篇 (一)
  5. 使用 Python 进行并发编程 - asyncio 篇 (二)
  6. 使用 Python 进行并发编程 - asyncio 篇 (三)
  7. 知乎 Live 全文搜索之让 elasticsearch_dsl 支持 asyncio

技术选型

  1. Sanic ,基于 Python 3.5 + 的异步 web 服务器,和 Flask 一样使用装饰器作为路由,支持 Blueprint。效果确实非常快。
  2. uvloop ,Sanic 默认使用 uvloop,这个实现基于 libuv,比 asyncio 默认的 loop 的块很多。
  3. marshmallow ,一个轻量级的转化复杂对象成为 Python 自带数据类型的库,为什么用它未来会详细介绍。

使用 Schema

了解过关系型数据库如 MySQL 的同学会比较熟悉 Schema 这个词,它是对数据库的结构描述。Schema 定义了表与表和表与字段之间的关系。在使用关系型数据库之前第一件事要先定 Schema,创建表 (库) 再去操作。

为什么我们在实现 RESTful API 的时候要考虑使用 Schema 呢?

  1. 首先 API 服务是给外部用的,比如移动端 (安卓、IOS 等),前端 (用 AJAX) 等。那么大家一开始就要协商好那些字段,以及字段的类型。因为你不关系,他们都是关心的,如果设计有问题会由于没有正确处理而造成移动端闪退等严重问题。
  2. 你需要对 API 返回的数据进行良好的管理和验证。

marshmallow 把一组数据映射成一个类:

from marshmallow import Schema, fields

class UserSchema(Schema):
    id = fields.Integer()
    url = fields.Str()
    name = fields.Str() 
    ...

这样既让不同编程语言的开发者一目了然,也能检验你提供的数据是不是按照这个定好的结构返回的。

同一组数据可以定义多种 Schema

思考一下,在搜索页面,每一项提供的空间有限,你无法把 Live 的全部信息(比如「描述」这种很长的内容的字段)都展示出来,也就是就算都返回了,其实只用了一部分字段,这造成了更多的网络延时和带宽消耗。但是在 Live 详情页理论上就可以展示全部的字段的内容了。

再假设下,如果是返回一部分,还是返回全部字段在后端做,就是不同的方法返回时对 to_dict 方法对一对 if/elif/else 的处理,其实看起来很乱。我是这样用的:

class UserSchema(Schema):
    id = fields.Integer()
    url = fields.Str()
    name = fields.Str()
    ...


class UserFullSchema(UserSchema):
    lives_url = fields.Str()
    speaker_id = fields.Str() 
    ...

也就是定义了多种 User 的 schema,按需选择。但是后端统一使用 to_dict 方法返回全部数据,在视图渲染的时候进行筛选。这样需要用一种好的表达方式:

from views.utils import marshal_with

@bp.route('/search')
@marshal_with([LiveSchema, UserSchema])
async def search(request):
    ...


@bp.route('/suggest')
@marshal_with(LiveSchema)
async def suggest(request):
    ...


@bp.route('/user/<user_id>')
@marshal_with([LiveFullSchema, UserFullSchema])
async def user(request, user_id):
    ...

我们先不考虑视图内的逻辑,简单的理解成他们是单个 live 的 to_dict 结果或者多个 live 的 to_dict 结果的列表

通过神奇的 marshal_with 装饰器传入你希望返回符合那种 Schema 的数据。

现在揭晓一下:

def marshal(data, fields):
    schemas = [field() for field in fields]
    if isinstance(data, (list, tuple)):
        return [marshal(d, fields) for d in data]

    type = data.get('type')
    for schema in schemas:
        if type in schema.__class__.__name__.lower():
            result, errors = schema.dump(data)
            if errors:
                for item in errors.items():
                    print('{}: {}'.format(*item))
            return result


class marshal_with(object):
    def __init__(self, fields):
        if not isinstance(fields, list):
            fields = [fields]
        self.fields = fields

    def __call__(self, f):
        @wraps(f)
        async def wrapper(*args, **kwargs):
            resp = await f(*args, **kwargs)
            return marshal(resp, self.fields)
        return wrapper

这个是 async 版本的,大家可以自己发散成 Python 2 的普通版。记得之前我们在 model 里面给每种数据的 to_dict 方法里面加了个{'type': 'live'}这种键值么,除了在小程序里面分辨数据的类型,在这里也是用来匹配那种 schema 的。举个例子,假如是一个 user 类型的数据。

@marshal_with([LiveFullSchema, UserFullSchema])的装饰下,由于 UserFullSchema 类名包含了 live 所以符合了。这比较黑科技一点..

深入使用 marshmallow

marshmallow 除了生成一个可读性很好的类和验证该字段是不是类型符合以外,还支持序列化和反序列化的处理。有什么意义呢。假如如下的 schema:

class LiveSchema(Schema):
    starts_at = fields.Date()

注意我们 model 存的 starts_at 是一个 datetime 类型的对象,无法被 json 序列化,所以返回之前应该先转化成字符串:

class LiveSchema(Schema):
    starts_at = fields.Method('get_start_time')

    def get_start_time(self, obj):
         return int(obj['starts_at'].strftime('%s'))

再举个例子:

class UserSchema(Schema):
    bio = fields.Str()
    headline = fields.Str() 
    description = fields.Str()

bio/headline/description 这三个字段内容都有可能比较长,需要做不同的截取:在详情页显示全部,在搜索页之前显示前 2 行.. 我们需要一个 truncate 函数:

WIDTH = 45

def truncate_utf8(str, width=WIDTH):
    return str[:width] + '...' if len(str) > width else str

假如使用 fields.Method 你就要写三方法,因为它指定的方法只有self, obj2 个参数,而且 Schema 是不能继承的。这么办呢?有我的书中介绍的 partialmethod 可完美实现:

from functools import partialmethod

class Item(object):
    def truncate(self, attr, obj):
        if attr not in obj:
            return ''
        return truncate_utf8(obj[attr], WIDTH)


class UserSchema(Schema):
    bio = fields.Method('truncate_bio')
    headline = fields.Method('truncate_headline')
    description = fields.Method('truncate_description')
    truncate_headline = partialmethod(Item.truncate, 'headline')
    truncate_bio = partialmethod(Item.truncate, 'bio')
    truncate_description = partialmethod(Item.truncate, 'description')

通过 partialmethod + 非继承至 Schema 的类就可以实现继承和额外参数了。

对 Sanic 定制

我们都知道,当你有独特的需求而框架不满足的时候,就要对其进行二次开发或者封装。我一般倾向基于框架提供的灵活性去封装。由于在多个 API 上都有分页的需要,参数中会出现 start/limit(当然你可以更喜欢其他的词汇)。如果不定制,那么在每个视图里面都要加一句:

@bp.route('/search')
@marshal_with([LiveSchema, UserSchema])
async def search(request): 
    start = request.args.get('start', 0)
    limit = request.args.get('limit', 10) 
    ...

这 2 句就是个累赘。怎么做呢? 利用 sanic 提供的中间件就好了:

@app.middleware('request')
async def halt_request(request):
    request.start = request.args.get('start', 0)
    request.limit = request.args.get('limit', 10)

但是这还不够,因为 sanic 在创建 Request 的时候基于内存的考虑使用了__slots__,所以我们要重新写 on_headers_complete 方法:

from sanic.server import HttpProtocol, CIMultiDict
from sanic.request import Request as _Request 


class Request(_Request):
    __slots__ = (
        'url', 'headers', 'version', 'method', '_cookies',
        'query_string', 'body', 'start', 'limit',
        'parsed_json', 'parsed_args', 'parsed_form', 'parsed_files',
    )


class JSONHttpProtocol(HttpProtocol):
    def on_headers_complete(self):
        remote_addr = self.transport.get_extra_info('peername')
        if remote_addr:
            self.headers.append(('Remote-Addr', '%s:%s' % remote_addr))

        self.request = Request(
            url_bytes=self.url,
            headers=CIMultiDict(self.headers),
            version=self.parser.get_http_version(),
            method=self.parser.get_method().decode()
        )

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8300, protocol=JSONHttpProtocol,
            workers=4, debug=True)

这样在视图中就可以直接使用 request.start 和 request.limit 了。

其次看官方用法,都是在视图中控制返回的内容的类型,比如:

from sanic.response import json

@app.route('/')
async def test(request):
    return json({"hello": "world"})

我也希望封装全部的返回结果的格式为:

{'rs': data}

PS: 当然生产环境中应该还有一个 error 字段甚至 error_code 字段标识如果出错的信息和类型等字段,我这里作为演示就保留了一个 rs 字段

这个 data 就是实际的视图返回的结果,但是在响应的时候已经封好了。可以继续重写 write_response 方法:

class JSONHttpProtocol(HttpProtocol):
    def write_response(self, response):
        if isinstance(response, str):
            response = text(response)
        elif isinstance(response, (list, dict)):
            response = {'rs': response}
        if isinstance(response, dict):
            response = json(response)

        return super().write_response(response)

这样返回的结果就很统一了。

至此,一个异步的、风格明确的、功能满足需要的 API 服务就完成了。

项目具体代码可见:https://github.com/dongweiming/weapp-zhihulive