Tornado Demo 之 chatdemo 不完全解读 - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
rapospectre
V2EX    Python

Tornado Demo 之 chatdemo 不完全解读

  •  
  •   rapospectre
    bluedazzle 2016 年 6 月 23 日 3196 次点击
    这是一个创建于 3488 天前的主题,其中的信息可能已经有所发展或是发生改变。

    tornado 源码自带了丰富的 demo ,这篇文章主要分析 demo 中的聊天室应用: chatdemo

    首先看 chatdemo 的目录结构:

    ├── chatdemo.py ├── static │ ├── chat.css │ └── chat.js └── templates ├── index.html ├── message.html └── room.html 

    非常简单,基本没有分层,三个模版一个 js 一个 css ,还有一个最重要的 chatdemo.py

    本文的重点是弄清楚 chatdemo.py 的运行流程,所以对于此项目的其他文件,包括模版及 chat.js 的实现都不会分析,只要知道 chat.js 的工作流程相信对于理解 chatdemo.py 没有任何问题

    此 demo 主要基于长轮询。 获取新消息的原理:

    1. 在 chat.js 中有一个定时器会定时执行 update 操作
    2. 当没有新消息时 tornado 会一直 hold 住 chat.js 发来的 update 请求
    3. 当有新消息时 tornado 将包含新消息的数据返回给所有 hold 的 update 请求
    4. 此时 chat.js 收到 update 回复后更新返回数据在聊天室中,同时再进行一次 update 请求, 然后又从 1. 开始执行。

    发送新消息的原理:

    1. 输入消息, 点击 post 按钮, chat.js 获取表单后用 ajax 方式发送请求 new
    2. tornado 收到请求 new ,返回消息本身, 同时通知所有 hold 住的 update 请求 ( 这里也包括发送 new 请求的 chat.js 所发送的 update 请求 ) 返回新消息
    3. 所有在线的 chat.js 收到 update 请求回复,更新返回信息到聊天室,同时再进行一次 update 请求。

    清楚了以上流程,我们直接来看 chatdemo.py :

    def main(): parse_command_line() app = tornado.web.Application( [ (r"/", MainHandler), (r"/a/message/new", MessageNewHandler), (r"/a/message/updates", MessageUpdatesHandler), ], cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__", template_path=os.path.join(os.path.dirname(__file__), "templates"), static_path=os.path.join(os.path.dirname(__file__), "static"), xsrf_cookies=True, debug=options.debug, ) app.listen(options.port) tornado.ioloop.IOLoop.current().start() if __name__ == "__main__": main() 

    main 函数主要用作初始化应用、监听端口以及启动 tornado server 。 我们看路由:

    1. 主页对应 MainHandler
    2. new 请求对应 MessageNewHandler
    3. updates 请求对应 MessageUpdatesHandler

    下面来看 MainHandler :

    # Making this a non-singleton is left as an exercise for the reader. global_message_buffer = MessageBuffer() class MainHandler(tornado.web.RequestHandler): def get(self): self.render("index.html", messages=global_message_buffer.cache) 

    只有一行代码,就是渲染并返回 index.html ,渲染的附加信息就是 global_message_buffer 的所有缓存消息。 global_message_buffer 是 MessageBuffer 的一个实例。 我们先不关心 MessageBuffer 内部是什么,现在我们只要记住它主要是用来储存聊天消息和连接到此聊天室的人的信息的类。 其中 MessageBuffer().cache 就是保存聊天室所有聊天消息的结构。

    然后来看 MessageNewHandler :

    class MessageNewHandler(tornado.web.RequestHandler): def post(self): message = { "id": str(uuid.uuid4()), "body": self.get_argument("body"), } # to_basestring is necessary for Python 3's json encoder, # which doesn't accept byte strings. message["html"] = tornado.escape.to_basestring( self.render_string("message.html", message=message)) if self.get_argument("next", None): self.redirect(self.get_argument("next")) else: self.write(message) global_message_buffer.new_messages([message]) 

    同样很简单,从 post 信息里获取发来的新消息 ( body ) ,然后给消息分配一个唯一的 uuid ,接着把这段消息渲染成一段 html ,然后 self.write(message) 返回这段 html , 同时给 global_message_buffer ( MessageBuffer 的实例 ) 添加这条新信息。 这里其实我更倾向于返回 json 之类的数据,这样更加直观和规范,可能写 demo 的人考虑到读者对 json 之类的协议可能不熟悉故而选择了返回渲染好的 html 直接让 chat.js append 到 index.html 里。

    接着来看 MessageUpdatesHandler :

    class MessageUpdatesHandler(tornado.web.RequestHandler): @gen.coroutine def post(self): cursor = self.get_argument("cursor", None) # Save the future returned by wait_for_messages so we can cancel # it in wait_for_messages self.future = global_mssage_buffer.wait_for_messages(cursor=cursor) messages = yield self.future if self.request.connection.stream.closed(): return self.write(dict(messages=messages)) def on_connection_close(self): global_message_buffer.cancel_wait(self.future) 

    重点就在这里,可以看到其内部的 post 方法被 gen.coroutine 修饰器修饰,也就是说这个 post 方法现在是 协程 ( coroutine ) 方式工作。 对于协程比较陌生的童鞋,你可以直接把它当作是单线程解决 io ( 网络请求 ) 密集运算被阻塞而导致低效率的解决方案。 当然这样理解协程还比较笼统,之后我会详细写一篇关于协程的文章,但在这里这样理解是没有问题的。

    现在来看代码内容,首先获取 cursor ,一个用来标识我们已经获取的消息的指针,这样 tornado 就不会把你已经获取的消息重复的发给你。 然后调用 global_message_buffer.wait_for_messages(cursor=cursor) 获取一个 future 对象。 future 对象是 tornado 实现的一个特殊的类的实例,它的作用就是包含之后 ( 未来 ) 将会返回的数据,我们现在不用关心 Future() 内部如何实现,只要记住上面它的作用就行。 关于 Future 的解读我会放到阅读 Future 源码时讲。

    然后看最关键的这句: messages = yield self.future 注意这里的 yield 就是 hold updates 请求的关键,它到这里相当于暂停了整个 post 函数 ( updates 请求被 hold )同时也相当于 updates 这次网络请求被阻塞,这个时候协程发挥作用,把这个函数暂停的地方的所有信息保存挂起,然后把工作线程释放,这样 tornado 可以继续接受 new 、 updates 等请求然后运行相应的方法处理请求。

    当有新的消息返回时,tornado 底层的 ioloop 实例将会调用 gen.send(value) 返回新消息( value )给每个被暂停的方法的 yield 处, 此时协程依次恢复这些被暂停的方法, 同时用获得的返回消息继续执行方法, 这时 messages = yield self.future 继续执行, messages 获得 yield 的返回值 value ( python 中调用 gen.send(value) 将会把 value 值返回到 yield 处并替换原前 yield 后的值 ),然后判断下用户是否已经离开,如果还在线则返回新消息。

    明白了以上流程,我们最后来看 MessageBuffer:

    class MessageBuffer(object): def __init__(self): self.waiters = set() self.cache = [] self.cache_size = 200 def wait_for_messages(self, cursor=None): # Construct a Future to return to our caller. This allows # wait_for_messages to be yielded from a coroutine even though # it is not a coroutine itself. We will set the result of the # Future when results are available. result_future = Future() if cursor: new_count = 0 for msg in reversed(self.cache): if msg["id"] == cursor: break new_count += 1 if new_count: result_future.set_result(self.cache[-new_count:]) return result_future self.waiters.add(result_future) return result_future def cancel_wait(self, future): self.waiters.remove(future) # Set an empty result to unblock any coroutines waiting. future.set_result([]) def new_messages(self, messages): logging.info("Sending new message to %r listeners", len(self.waiters)) for future in self.waiters: future.set_result(messages) self.waiters = set() self.cache.extend(messages) if len(self.cache) > self.cache_size: self.cache = self.cache[-self.cache_size:] 

    初始化方法中 self.waiters 就是一个等待新消息的 listener 集合 ( 直接理解成所有被 hold 住的 updates 请求队列可能更清晰 )

    self.cache 就是储存所有聊天消息的列表,self.cache_size = 200 则定义了 cache 的大小 是存 200 条消息。

    然后先来看简单的 new_messages:

    遍历 waiters 列表,然后给所有的等待者返回新消息,同时清空等待者队列。 然后把消息加到缓存 cache 里,如果缓存大于限制则取最新的 200 条消息。这里只要注意到 future.set_result(messages) 就是用来给 future 对象添加返回数据 ( 之前被 yield 暂停的地方此时因为 set_result() 方法将会获得 "未来" 的数据 ) 这一点即可。

    然后来看 wait_for_messages :

     def wait_for_messages(self, cursor=None): # Construct a Future to return to our caller. This allows # wait_for_messages to be yielded from a coroutine even though # it is not a coroutine itself. We will set the result of the # Future when results are available. result_future = Future() if cursor: new_count = 0 for msg in reversed(self.cache): if msg["id"] == cursor: break new_count += 1 if new_count: result_future.set_result(self.cache[-new_count:]) return result_future self.waiters.add(result_future) return result_future 

    首先初始化一个 Future 对象,然后根据 cursor 判断哪些消息已经获取了哪些还没获取,如果缓存中有对于这个 waiter 还没获取过的消息,则直接调用 set_result() 返回这些缓存中已有的但对于这个 waiter 来说是新的的数据。 如果这个 waiter 已经有缓存中的所有数据,那么就把它加到等待者队列里保持等待,直到有新消息来时调用 new_messages 再返回。

    而最后一个 cancel_wait 就很简单了,当有用户退出聊天室时,直接从 self.waiters 中移除他所对应的等待者。

    当明白了整个代码的运行流程后,我们可以基于这个简单的 demo 而写出更加丰富的例子,比如加入 session ,做登陆、做好友关系,做单聊做群聊等等。

    chatdemo with room是我添加的一个简单功能,输入聊天室房号后再进行聊天,只有同一房间中的人才能收到彼此的消息。

    以上就是鄙人对整个 chatdemo.py 的解读。 在阅读此 demo 时,我没有参考其他源码解读,只是通过阅读 tornado 底层的源码而得出的个人的理解,因此肯定会有很多理解不成熟甚至错误的地方,还望大家多多指教。

    原文地址

    作者:rapospectre

    4 条回复    2016-11-02 15:46:19 +08:00
    mornlight
        1
    mornlight  
       2016 年 6 月 24 日   1
    哈哈哈哈哈,你这博客的评论功能越做越不行了啊,比上次我留言的时候更差了
    mornlight
        2
    mornlight  
       2016 年 6 月 24 日
    div id="r_3309213" class="cell">
    rapospectre
        3
    rapospectre  
    OP
       2016 年 6 月 25 日
    @mornlight 我有罪,用户体验好差,以后上线前一定做好测试,呜呜呜,已经跪着把 bug 修好了,谢谢你. U ● ● U
    burnex
        4
    burnex  
       2016 年 11 月 2 日
    @rapospectre 多谢楼主深入浅出的讲解
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     2661 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 32ms UTC 11:29 PVG 19:29 LAX 03:29 JFK 06:29
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86