diff options
Diffstat (limited to 'libs/waitress/trigger.py')
-rw-r--r-- | libs/waitress/trigger.py | 203 |
1 files changed, 203 insertions, 0 deletions
diff --git a/libs/waitress/trigger.py b/libs/waitress/trigger.py new file mode 100644 index 000000000..24c4d0d6b --- /dev/null +++ b/libs/waitress/trigger.py @@ -0,0 +1,203 @@ +############################################################################## +# +# Copyright (c) 2001-2005 Zope Foundation and Contributors. +# All Rights Reserved. +# +# This software is subject to the provisions of the Zope Public License, +# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution. +# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED +# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS +# FOR A PARTICULAR PURPOSE +# +############################################################################## + +import errno +import os +import socket +import threading + +from . import wasyncore + +# Wake up a call to select() running in the main thread. +# +# This is useful in a context where you are using Medusa's I/O +# subsystem to deliver data, but the data is generated by another +# thread. Normally, if Medusa is in the middle of a call to +# select(), new output data generated by another thread will have +# to sit until the call to select() either times out or returns. +# If the trigger is 'pulled' by another thread, it should immediately +# generate a READ event on the trigger object, which will force the +# select() invocation to return. +# +# A common use for this facility: letting Medusa manage I/O for a +# large number of connections; but routing each request through a +# thread chosen from a fixed-size thread pool. When a thread is +# acquired, a transaction is performed, but output data is +# accumulated into buffers that will be emptied more efficiently +# by Medusa. [picture a server that can process database queries +# rapidly, but doesn't want to tie up threads waiting to send data +# to low-bandwidth connections] +# +# The other major feature provided by this class is the ability to +# move work back into the main thread: if you call pull_trigger() +# with a thunk argument, when select() wakes up and receives the +# event it will call your thunk from within that thread. The main +# purpose of this is to remove the need to wrap thread locks around +# Medusa's data structures, which normally do not need them. [To see +# why this is true, imagine this scenario: A thread tries to push some +# new data onto a channel's outgoing data queue at the same time that +# the main thread is trying to remove some] + + +class _triggerbase: + """OS-independent base class for OS-dependent trigger class.""" + + kind = None # subclass must set to "pipe" or "loopback"; used by repr + + def __init__(self): + self._closed = False + + # `lock` protects the `thunks` list from being traversed and + # appended to simultaneously. + self.lock = threading.Lock() + + # List of no-argument callbacks to invoke when the trigger is + # pulled. These run in the thread running the wasyncore mainloop, + # regardless of which thread pulls the trigger. + self.thunks = [] + + def readable(self): + return True + + def writable(self): + return False + + def handle_connect(self): + pass + + def handle_close(self): + self.close() + + # Override the wasyncore close() method, because it doesn't know about + # (so can't close) all the gimmicks we have open. Subclass must + # supply a _close() method to do platform-specific closing work. _close() + # will be called iff we're not already closed. + def close(self): + if not self._closed: + self._closed = True + self.del_channel() + self._close() # subclass does OS-specific stuff + + def pull_trigger(self, thunk=None): + if thunk: + with self.lock: + self.thunks.append(thunk) + self._physical_pull() + + def handle_read(self): + try: + self.recv(8192) + except OSError: + return + with self.lock: + for thunk in self.thunks: + try: + thunk() + except: + nil, t, v, tbinfo = wasyncore.compact_traceback() + self.log_info( + "exception in trigger thunk: (%s:%s %s)" % (t, v, tbinfo) + ) + self.thunks = [] + + +if os.name == "posix": + + class trigger(_triggerbase, wasyncore.file_dispatcher): + kind = "pipe" + + def __init__(self, map): + _triggerbase.__init__(self) + r, self.trigger = self._fds = os.pipe() + wasyncore.file_dispatcher.__init__(self, r, map=map) + + def _close(self): + for fd in self._fds: + os.close(fd) + self._fds = [] + wasyncore.file_dispatcher.close(self) + + def _physical_pull(self): + os.write(self.trigger, b"x") + + +else: # pragma: no cover + # Windows version; uses just sockets, because a pipe isn't select'able + # on Windows. + + class trigger(_triggerbase, wasyncore.dispatcher): + kind = "loopback" + + def __init__(self, map): + _triggerbase.__init__(self) + + # Get a pair of connected sockets. The trigger is the 'w' + # end of the pair, which is connected to 'r'. 'r' is put + # in the wasyncore socket map. "pulling the trigger" then + # means writing something on w, which will wake up r. + + w = socket.socket() + # Disable buffering -- pulling the trigger sends 1 byte, + # and we want that sent immediately, to wake up wasyncore's + # select() ASAP. + w.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + + count = 0 + while True: + count += 1 + # Bind to a local port; for efficiency, let the OS pick + # a free port for us. + # Unfortunately, stress tests showed that we may not + # be able to connect to that port ("Address already in + # use") despite that the OS picked it. This appears + # to be a race bug in the Windows socket implementation. + # So we loop until a connect() succeeds (almost always + # on the first try). See the long thread at + # http://mail.zope.org/pipermail/zope/2005-July/160433.html + # for hideous details. + a = socket.socket() + a.bind(("127.0.0.1", 0)) + connect_address = a.getsockname() # assigned (host, port) pair + a.listen(1) + try: + w.connect(connect_address) + break # success + except OSError as detail: + if detail[0] != errno.WSAEADDRINUSE: + # "Address already in use" is the only error + # I've seen on two WinXP Pro SP2 boxes, under + # Pythons 2.3.5 and 2.4.1. + raise + # (10048, 'Address already in use') + # assert count <= 2 # never triggered in Tim's tests + if count >= 10: # I've never seen it go above 2 + a.close() + w.close() + raise RuntimeError("Cannot bind trigger!") + # Close `a` and try again. Note: I originally put a short + # sleep() here, but it didn't appear to help or hurt. + a.close() + + r, addr = a.accept() # r becomes wasyncore's (self.)socket + a.close() + self.trigger = w + wasyncore.dispatcher.__init__(self, r, map=map) + + def _close(self): + # self.socket is r, and self.trigger is w, from __init__ + self.socket.close() + self.trigger.close() + + def _physical_pull(self): + self.trigger.send(b"x") |