深入理解 tornado 之 底层 ioloop 实现(二) - V2EX
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
推荐学习书目
Learn Python the Hard Way
Python Sites
PyPI - Python Package Index
http://diveintopython.org/toc/index.html
Pocoo
值得关注的项目
PyPy
Celery
Jinja2
Read the Docs
gevent
pyenv
virtualenv
Stackless Python
Beautiful Soup
结巴中文分词
Green Unicorn
Sentry
Shovel
Pyflakes
pytest
Python 编程
pep8 Checker
Styles
PEP 8
Google Python Style Guide
Code Style from The Hitchhiker's Guide
rapospectre
V2EX    Python

深入理解 tornado 之 底层 ioloop 实现(二)

  •  
  •   rapospectre
    bluedazzle 2016-06-06 21:14:50 +08:00 3926 次点击
    这是一个创建于 3503 天前的主题,其中的信息可能已经有所发展或是发生改变。
    def configurable_default(cls): """Returns the implementation class to be used if none is configured.""" raise NotImplementedError() 

    显然也是个接口,那么我们再回头看 ioloop 的 configurable_default():

    def configurable_default(cls): if hasattr(select, "epoll"): from tornado.platform.epoll import EPollIOLoop return EPollIOLoop if hasattr(select, "kqueue"): # Python 2.6+ on BSD or Mac from tornado.platform.kqueue import KQueueIOLoop return KQueueIOLoop from tornado.platform.select import SelectIOLoop return SelectIOLoop 

    原来这是个工厂函数,根据不同的操作系统返回不同的事件池( linux 就是 epoll , mac 返回 kqueue ,其他就返回普通的 select 。 kqueue 基本等同于 epoll , 只是不同系统对其的不同实现)

    现在线索转移到了 tornado.platform.epoll.EPollIOLoop 上,我们再来看看 EPollIOLoop:

    tornado.platform.epoll.EPollIOLoop

    import select from tornado.ioloop import PollIOLoop class EPollIOLoop(PollIOLoop): def initialize(self, **kwargs): super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs) 

    EPollIOLoop 完全继承自 PollIOLoop注意这里是 PollIOLoop 不是 IOLoop)并只是在初始化时指定了 impl 是 epoll ,所以看起来我们用 IOLoop 初始化最后初始化的其实就是这个 PollIOLoop,所以接下来,我们真正需要理解和阅读的内容应该都在这里:

    tornado.ioloop.PollIOLoop

    class PollIOLoop(IOLoop): """Base class for IOLoops built around a select-like function. For concrete implementations, see `tornado.platform.epoll.EPollIOLoop` (Linux), `tornado.platform.kqueue.KQueueIOLoop` (BSD and Mac), or `tornado.platform.select.SelectIOLoop` (all platforms). """ def initialize(self, impl, time_func=None, **kwargs): super(PollIOLoop, self).initialize(**kwargs) self._impl = impl if hasattr(self._impl, 'fileno'): set_close_exec(self._impl.fileno()) self.time_func = time_func or time.time self._handlers = {} self._events = {} self._callbacks = [] self._callback_lock = threading.Lock() self._timeouts = [] self._cancellatiOns= 0 self._running = False self._stopped = False self._closing = False self._thread_ident = None self._blocking_signal_threshold = None self._timeout_counter = itertools.count() # Create a pipe that we send bogus data to when we want to wake # the I/O loop when it is idle self._waker = Waker() self.add_handler(self._waker.fileno(), lambda fd, events: self._waker.consume(), self.READ) def close(self, all_fds=False): with self._callback_lock: self._closing = True self.remove_handler(self._waker.fileno()) if all_fds: for fd, handler in self._handlers.values(): self.close_fd(fd) self._waker.close() self._impl.close() self._callbacks = None self._timeouts = None def add_handler(self, fd, handler, events): fd, obj = self.split_fd(fd) self._handlers[fd] = (obj, stack_context.wrap(handler)) self._impl.register(fd, events | self.ERROR) def update_handler(self, fd, events): fd, obj = self.split_fd(fd) self._impl.modify(fd, events | self.ERROR) def remove_handler(self, fd): fd, obj = self.split_fd(fd) self._handlers.pop(fd, None) self._events.pop(fd, None) try: self._impl.unregister(fd) except Exception: gen_log.debug("Error deleting fd from IOLoop", exc_info=True) def set_blocking_signal_threshold(self, seconds, action): if not hasattr(signal, "setitimer"): gen_log.error("set_blocking_signal_threshold requires a signal module " "with the setitimer method") return self._blocking_signal_threshold = seconds if seconds is not None: signal.signal(signal.SIGALRM, action if action is not None else signal.SIG_DFL) def start(self: ... try: while True: # Prevent IO event starvation by delaying new callbacks # to the next iteration of the event loop. with self._callback_lock: callbacks = self._callbacks self._callbacks = [] # Add any timeouts that have come due to the callback list. # Do not run anything until we have determined which ones # are ready, so timeouts that call add_timeout cannot # schedule anything in this iteration. due_timeouts = [] if self._timeouts: now = self.time() while self._timeouts: if self._timeouts[0].callback is None: # The timeout was cancelled. Note that the # cancellation check is repeated below for timeouts # that are cancelled by another timeout or callback. heapq.heappop(self._timeouts) self._cancellations -= 1 elif self._timeouts[0].deadline <= now: due_timeouts.append(heapq.heappop(self._timeouts)) else: break if (self._cancellations > 512 and self._cancellations > (len(self._timeouts) >> 1)): # Clean up the timeout queue when it gets large and it's # more than half cancellations. self._cancellatiOns= 0 self._timeouts = [x for x in self._timeouts if x.callback is not None] heapq.heapify(self._timeouts) for callback in callbacks: self._run_callback(callback) for timeout in due_timeouts: if timeout.callback is not None: self._run_callback(timeout.callback) # Closures may be holding on to a lot of memory, so allow # them to be freed before we go into our poll wait. callbacks = callback = due_timeouts = timeout = None if self._callbacks: # If any callbacks or timeouts called add_callback, # we don't want to wait in poll() before we run them. poll_timeout = 0.0 elif self._timeouts: # If there are any timeouts, schedule the first one. # Use self.time() instead of 'now' to account for time # spent running callbacks. poll_timeout = self._timeouts[0].deadline - self.time() poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) else: # No timeouts and no callbacks, so use the default. poll_timeout = _POLL_TIMEOUT if not self._running: break if self._blocking_signal_threshold is not None: # clear alarm so it doesn't fire while poll is waiting for # events. signal.setitimer(signal.ITIMER_REAL, 0, 0) try: event_pairs = self._impl.poll(poll_timeout) except Exception as e: # Depending on python version and IOLoop implementation, # different exception types may be thrown and there are # two ways EINTR might be signaled: # * e.errno == errno.EINTR # * e.args is like (errno.EINTR, 'Interrupted system call') if errno_from_exception(e) == errno.EINTR: continue else: raise if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, self._blocking_signal_threshold, 0) # Pop one fd at a time from the set of pending fds and run # its handler. Since that handler may perform actions on # other file descriptors, there may be reentrant calls to # this IOLoop that update self._events self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() try: fd_obj, handler_func = self._handlers[fd] handler_func(fd_obj, events) except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: # Happens when the client closes the connection pass else: self.handle_callback_exception(self._handlers.get(fd)) except Exception: self.handle_callback_exception(self._handlers.get(fd)) fd_obj = handler_func = None finally: # reset the stopped flag so another start/stop pair can be issued self._stopped = False if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0) IOLoop._current.instance = old_current if old_wakeup_fd is not None: signal.set_wakeup_fd(old_wakeup_fd) def stop(self): self._running = False self._stopped = True self._waker.wake() def time(self): return self.time_func() def call_at(self, deadline, callback, *args, **kwargs): timeout = _Timeout( deadline, functools.partial(stack_context.wrap(callback), *args, **kwargs), self) heapq.heappush(self._timeouts, timeout) return timeout def remove_timeout(self, timeout): # Removing from a heap is complicated, so just leave the defunct # timeout object in the queue (see discussion in # http://docs.python.org/library/heapq.html). # If this turns out to be a problem, we could add a garbage # collection pass whenever there are too many dead timeouts. timeout.callback = None self._cancellations += 1 def add_callback(self, callback, *args, **kwargs): with self._callback_lock: if self._closing: raise RuntimeError("IOLoop is closing") list_empty = not self._callbacks self._callbacks.append(functools.partial( stack_context.wrap(callback), *args, **kwargs)) if list_empty and thread.get_ident() != self._thread_ident: # If we're in the IOLoop's thread, we know it's not currently # polling. If we're not, and we added the first callback to an # empty list, we may need to wake it up (it may wake up on its # own, but an occasional extra wake is harmless). Waking # up a polling IOLoop is relatively expensive, so we try to # avoid it when we can. self._waker.wake() def add_callback_from_signal(self, callback, *args, **kwargs): with stack_context.NullContext(): if thread.get_ident() != self._thread_ident: # if the signal is handled on another thread, we can add # it normally (modulo the NullContext) self.add_callback(callback, *args, **kwargs) else: # If we're on the IOLoop's thread, we cannot use # the regular add_callback because it may deadlock on # _callback_lock. Blindly insert into self._callbacks. # This is safe because the GIL makes list.append atomic. # One subtlety is that if the signal interrupted the # _callback_lock block in IOLoop.start, we may modify # either the old or new version of self._callbacks, # but either way will work. self._callbacks.append(functools.partial( stack_context.wrap(callback), *args, **kwargs)) 

    果然, PollIOLoop 继承自 IOLoop 并实现了它的所有接口,现在我们终于可以进入真正的正题了:joy:

    ioloop 分析

    首先要看的是关于 epoll 操作的方法,还记得前文说过的 epoll 只需要四个 api 就能完全操作嘛? 我们来看 PollIOLoop 的实现:

    epoll 操作

    def add_handler(self, fd, handler, events): fd, obj = self.split_fd(fd) self._handlers[fd] = (obj, stack_context.wrap(handler)) self._impl.register(fd, events | self.ERROR) def update_handler(self, fd, events): fd, obj = self.split_fd(fd) self._impl.modify(fd, events | self.ERROR) def remove_handler(self, fd): fd, obj = self.split_fd(fd) self._handlers.pop(fd, None) self._events.pop(fd, None) try: self._impl.unregister(fd) except Exception: gen_log.debug("Error deleting fd from IOLoop", exc_info=True) 

    epoll_ctl:这个三个方法分别对应 epoll_ctl 中的 add 、 modify 、 del 参数。 所以这三个方法实现了 epoll 的 epoll_ctl 。

    epoll_create:然后 epoll 的生成在前文 EPollIOLoop 的初始化中就已经完成了:super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)。 这个相当于 epoll_create 。

    epoll_wait: epoll_wait 操作则在 start() 中:event_pairs = self._impl.poll(poll_timeout)

    epoll_close:而 epoll 的 close 则在 PollIOLoop 中的 close 方法内调用: self._impl.close() 完成。

    initialize

    接下来看 PollIOLoop 的初始化方法中作了什么:

    def initialize(self, impl, time_func=None, **kwargs): super(PollIOLoop, self).initialize(**kwargs) self._impl = impl # 指定 epoll if hasattr(self._impl, 'fileno'): set_close_exec(self._impl.fileno()) # fork 后关闭无用文件描述符 self.time_func = time_func or time.time # 指定获取当前时间的函数 self._handlers = {} # handler 的字典,储存被 epoll 监听的 handler ,与打开它的文件描述符 ( file descriptor 简称 fd ) 一一对应 self._events = {} # event 的字典,储存 epoll 返回的活跃的 fd event pairs self._callbacks = [] # 储存各个 fd 回调函数的列表 self._callback_lock = threading.Lock() # 指定进程锁 self._timeouts = [] # 将是一个最小堆结构,按照超时时间从小到大排列的 fd 的任务堆( 通常这个任务都会包含一个 callback ) self._cancellatiOns= 0 # 关于 timeout 的计数器 self._running = False # ioloop 是否在运行 self._stopped = False # ioloop 是否停止 self._closing = False # ioloop 是否关闭 self._thread_ident = None # 当前线程堆标识符 ( thread identify ) self._blocking_signal_threshold = None # 系统信号, 主要用来在 epoll_wait 时判断是否会有 signal alarm 打断 epoll self._timeout_counter = itertools.count() # 超时计数器 ( 暂时不是很明白具体作用,好像和前面的 _cancellations 有关系? 请大神讲讲) self._waker = Waker() # 一个 waker 类,主要是对于管道 pipe 的操作,因为 ioloop 属于底层的数据操作,这里 epoll 监听的是 pipe self.add_handler(self._waker.fileno(), lambda fd, events: self._waker.consume(), self.READ) # 将管道加入 epoll 监听,对于 web server 初始化时只需要关心 READ 事件 

    除了注释中的解释,还有几点补充:

    1. close_exec 的作用: 子进程在 fork 出来的时候,使用了写时复制( COW , Copy-On-Write )方式获得父进程的数据空间、 堆和栈副本,这其中也包括文件描述符。刚刚 fork 成功时,父子进程中相同的文件描述符指向系统文件表中的同一项,接着,一般我们会调用 exec 执行另一个程序,此时会用全新的程序替换子进程的正文,数据,堆和栈等。此时保存文件描述符的变量当然也不存在了,我们就无法关闭无用的文件描述符了。所以通常我们会 fork 子进程后在子进程中直接执行 close 关掉无用的文件描述符,然后再执行 exec 。 所以 close_exec 执行的其实就是 关闭 + 执行的作用。 详情可以查看: 关于 linux 进程间的 close-on-exec 机制

    2. Waker(): Waker 封装了对于管道 pipe 的操作:

      def set_close_exec(fd): flags = fcntl.fcntl(fd, fcntl.F_GETFD) fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC) def _set_nonblocking(fd): flags = fcntl.fcntl(fd, fcntl.F_GETFL) fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK) class Waker(interface.Waker): def __init__(self): r, w = os.pipe() _set_nonblocking(r) _set_nonblocking(w) set_close_exec(r) set_close_exec(w) self.reader = os.fdopen(r, "rb", 0) self.writer = os.fdopen(w, "wb", 0) def fileno(self): return self.reader.fileno() def write_fileno(self): return self.writer.fileno() def wake(self): try: self.writer.write(b"x") except IOError: pass def consume(self): try: while True: result = self.reader.read() if not result: break except IOError: pass def close(self): self.reader.close() self.writer.close() ``` 

    可以看到 waker 把 pipe 分为读、 写两个管道并都设置了非阻塞和 close_exec。 注意wake(self)方法中:self.writer.write(b"x") 直接向管道中写入随意字符从而释放管道。

    原文地址

    作者:rapospectre

    1 条回复    2016-06-06 22:18:31 +08:00
    rapospectre
        1
    rapospectre  
    OP
       2016-06-06 22:18:31 +08:00
    噗,,还有一部分发不出来了,不能短时间内频繁发布主 @_@
    关于     帮助文档     自助推广系统     博客     API     FAQ     Solana     5462 人在线   最高记录 6679       Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 29ms UTC 03:28 PVG 11:28 LAX 19:28 JFK 22:28
    Do have faith in what you're doing.
    ubao msn snddm index pchome yahoo rakuten mypaper meadowduck bidyahoo youbao zxmzxm asda bnvcg cvbfg dfscv mmhjk xxddc yybgb zznbn ccubao uaitu acv GXCV ET GDG YH FG BCVB FJFH CBRE CBC GDG ET54 WRWR RWER WREW WRWER RWER SDG EW SF DSFSF fbbs ubao fhd dfg ewr dg df ewwr ewwr et ruyut utut dfg fgd gdfgt etg dfgt dfgd ert4 gd fgg wr 235 wer3 we vsdf sdf gdf ert xcv sdf rwer hfd dfg cvb rwf afb dfh jgh bmn lgh rty gfds cxv xcv xcs vdas fdf fgd cv sdf tert sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf sdf shasha9178 shasha9178 shasha9178 shasha9178 shasha9178 liflif2 liflif2 liflif2 liflif2 liflif2 liblib3 liblib3 liblib3 liblib3 liblib3 zhazha444 zhazha444 zhazha444 zhazha444 zhazha444 dende5 dende denden denden2 denden21 fenfen9 fenf619 fen619 fenfe9 fe619 sdf sdf sdf sdf sdf zhazh90 zhazh0 zhaa50 zha90 zh590 zho zhoz zhozh zhozho zhozho2 lislis lls95 lili95 lils5 liss9 sdf0ty987 sdft876 sdft9876 sdf09876 sd0t9876 sdf0ty98 sdf0976 sdf0ty986 sdf0ty96 sdf0t76 sdf0876 df0ty98 sf0t876 sd0ty76 sdy76 sdf76 sdf0t76 sdf0ty9 sdf0ty98 sdf0ty987 sdf0ty98 sdf6676 sdf876 sd876 sd876 sdf6 sdf6 sdf9876 sdf0t sdf06 sdf0ty9776 sdf0ty9776 sdf0ty76 sdf8876 sdf0t sd6 sdf06 s688876 sd688 sdf86