在上一篇 项目设计 中,我说到了 SSE(Server-Sent Events)是为了实现单方向的消息推送,今天介绍下实际的使用。

我直接用了现成的 Flask-SSE ,其实 SSE 实现的原理比较简单:

  1. 借用 Redis 的发布 / 订阅模式创建一个方法,方法内会调用 pubsub.listen 监听新的发布数据。
  2. 使用 Flask 提供的 stream_with_context,不断的从上面的方法中获取数据。

使用起来分 2 部分

前端

在前端页面添加一个函数:

function eventSourceListener() {
    let source = new EventSource(`${API_URL}/stream`);
    let self = this;
    source.addEventListener('login', function(event) {
        let data = JSON.parse(event.data);
        if (data.type == 'scan_qr_code') {
            self.uuid = data.uuid;
            self.qrCode = `data:image/png;base64,${data.extra}`;
        } else if (data.type == 'confirm_login') {
            self.sub_title = 'Scan successful';
            self.sub_desc = 'Confirm login on mobile WeChat';
            self.qrCode = data.extra;
        } else if (data.type == 'logged_in') {
            sessionStorage.setItem('user', JSON.stringify(data.user));
            self.$router.push({ path: '/main' });
        } else if (data.type == 'logged_out') {
            sessionStorage.removeItem('user');
            self.$router.push('/login');
        }
    }, false);

    source.addEventListener('notification', function(event) {
        let data = JSON.parse(event.data);
        self.notificationCount = data.count;
    }, false);

    source.addEventListener('error', function(event) {
        console.log("Failed to connect to event stream");
    }, false);
}

这段代码放在一个自定义的 Vue 的插件里面,这样在所有页面上都要自动包含这部分代码了。source.addEventListener 用来添加事件监听,它监听了 3 种类型的消息:

  1. login 登录,也就是在页面反映当前微信的登录状态(等待扫码 / 扫码完成等待确认 / 确认完成)。不同的消息会执行不同的操作,页面也会立刻渲染出最新的结果。
  2. notification 消息提醒,会有一个异步任务定期检查新入库的消息,有新的消息就是发布出来通知新消息数。
  3. error 内置的错误消息,当然这个加不加倒还好

另外在登陆后执行sessionStorage.setItem('user', JSON.stringify(data.user));会设置浏览器的 session,下次自动登录后右侧就显示头像了,这样能减少后端的负担,退出时 removeItem 方法再删掉。

后端

后端包含 2 部分,第一部分是用 Flask 实现上面说的${API_URL}/stream这个接口,这是一个长连接,消息就是从这里推送出去的。由于第一部分是阻塞的,我们需要异步的方式往这个阻塞进程里面推送数据,也就是开头说的,利用 Redis 的发布 / 订阅模式发布消息。比如通知调用起来是 这样的

from app import app as sse_api

with sse_api.app_context():
    sse.publish({'count': count}, type='notification')

登陆过程要复杂一些,我之前说过在我 fork 的 ItChat 和 wxpy 分支里面添加了信号的支持,这个信号是需要「注册」的,也就是在 import 之前就要注册,效果要类似 这样

from itchat.signals import scan_qr_code, confirm_login, logged_out

def publish(uuid, **kw):
    from app import app
    with app.app_context():
        params = {'uuid': uuid, 'extra': kw.pop('extra', None),
                  'type': kw.pop('type', None)}
        params.update(kw)
        sse.publish(params, type='login')


scan_qr_code.connect(publish)
confirm_login.connect(publish)
logged_out.connect(publish)

from wxpy import *  # noqa

这里用了信号的 connect 方法。举个 logged_out 的例子,在 ItChat 里面,首先 定义这个信号

from blinker import Namespace

_signals = Namespace()
logged_out  = _signals.signal('logged-out')

需要在对应发信号的地方调用 send 方法

logged_out.send(self.uuid, type='logged_out')

另外有个坑儿,首次打开 Web 页面的是一个铺满 div 的 gif 图片,一开始设想的是在下载二维码图片之后,通过修改 img 的 src 属性指到这个图片,实际开发中发现,这个二维码图片被会更新不及时,会使用缓存的就图片所以发送信号的时候不使用图片 HTTP 地址,而是 Data URLs,这就需要把图片内容编码一下:

encoded = base64.b64encode(qrStorage.getvalue()).decode('ascii')              
scan_qr_code.send(self.uuid, extra=encoded, type='scan_qr_code')

结语

这样借助 Redis 和 Celery 就实现了 SSE 的使用,下一节我将介绍 Celery 的使用。