先简单介绍下各种 IO 模型: 最容易做的是阻塞 IO,即读写数据时,需要等待操作完成,才能继续执行。进阶的做法就是用多线程来处理需要 IO 的部分,缺点是开销会有些大。 接着是非阻塞 IO,即读写数据时,如果暂时不可读写,则立刻返回,而不等待。因为不知道什么时候是可读写的,所以轮询时可能会浪费 CPU 时间。 然后是 IO 复用,即在读写数据前,先检查哪些描述符是可读写的,再去读写。select 和 poll 就是这样做的,它们会遍历所有被监视的描述符,查看是否满足,这个检查的过程是阻塞的。而 epoll、kqueue 和 /dev/poll 则做了些改进,事先注册需要检查哪些描述符的哪些事件,当状态发生变化时,内核会调用对应的回调函数,将这些描述符保存下来;下次获取可用的描述符时,直接返回这些发生变化的描述符即可。 再之后是信号驱动,即描述符就绪时,内核发送 SIGIO 信号,再由信号处理程序去处理这些信号即可。不过信号处理的时机是从内核态返回用户态时,感觉也得把这些事件收集起来才好处理,有点像模拟 IO 复用了。 最后是异步 IO,即读写数据时,只注册事件,内核完成读写后(读取的数据会复制到用户态),再调用事件处理函数。这整个过程都不会阻塞调用线程,不过实现它的操作系统比较少,Windows 上有比较成熟的 IOCP,Linux 上的 AIO 则有不少缺点。 虽然真正的异步 IO 需要中间任何步骤都没有阻塞,这对于某些只是偶尔需要处理 IO 请求的情况确实有用(比如文本编辑器偶尔保存一下文件);但对于服务器端编程的大多数情况而言,它的主线程就是用来处理 IO 请求的,如果在空闲时不阻塞在 IO 等待上,也没有别的事情能做,所以本文就不纠结这个异步是否名副其实了。 在 Python 2 的时代,高性能的网络编程主要是使用 Twisted、Tornado 和 gevent 这三个库。 我对 Twisted 不熟,只知道它的缺点是比较重,性能相对而言并不算好。 Tornado 平时用得比较多,缺点是写异步调用时特别麻烦。 gevent 我只能算接触过,缺点是不太干净。 由于它们都各自有一个 IO loop,不好混用,而 Tornado 的 web 框架相对而言比较完善,因此成了我的首选。 而从 Python 3.4 开始,标准库里又新增了 asyncio 这个模块。 从原理上来说,它和 Tornado 其实差不多,都是注册 IO 事件,然后在 IO loop 中等待事件发生,然后调用相应的处理函数。 不同之处在于 Python 3 增加了一些新的特性,而 Tornado 需要兼容 Python 2,所以写起来会比较麻烦。 举例来说,Python 3.3 可以在 generator 中 return 返回值(相当于 raise StopIteration),而 Tornado 中需要 raise 一个 Return 对象。此外,Python 3.3 还增加了 yield from 语法,减轻了在 generator 中处理另一个 generator 的工作量(省去了循环和 try … except …)。 不过,虽然 asyncio 有那么多得天独厚的优势,却不一定比 Tornado 的性能更好,所以我写个简单的例子测试一下。 比较方法就是写个最简单的 HTTP 服务器,不做任何检查,读取到任何内容都输出一个 hello world,并断开连接。 测试的客户端就懒得写了,直接用 ab 即可: ab -n 10000 -c 10 "http://0.0.0.0:8000/" ab -n 10000 -c 10 "http://0.0.0.0:8000/" Tornado 版是这样:
from tornado.gen import coroutine
from tornado.ioloop import IOLoop
from tornado.tcpserver import TCPServer
class Server(TCPServer):
@coroutine
def handle_stream(self, stream, address):
try:
yield stream.read_bytes(1024, partial=True)
yield stream.write(b'HTTP 1.0 200 OKrnrnhello world')
finally:
stream.close()
server = Server()
server.bind(8000)
server.start(1)
IOLoop.current().start()
fromtornado.genimportcoroutine fromtornado.ioloopimportIOLoop fromtornado.tcpserverimportTCPServer class Server(TCPServer): @coroutine defhandle_stream(self, stream, address): try: yieldstream.read_bytes(1024, partial=True) yieldstream.write(b'HTTP 1.0 200 OKrnrnhello world') finally: stream.close() server = Server() server.bind(8000) server.start(1) IOLoop.current().start() 在我的电脑上大概 4000 QPS。 asyncio 版是这样:
import asyncio
class Server(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
try:
self.transport.write(b'HTTP/1.1 200 OKrnrnhello world')
finally:
self.transport.close()
loop = asyncio.get_event_loop()
server = loop.create_server(Server, '', 8000)
loop.run_until_complete(server)
loop.run_forever()
importasyncio class Server(asyncio.Protocol): defconnection_made(self, transport): self.transport = transport defdata_received(self, data): try: self.transport.write(b'HTTP/1.1 200 OKrnrnhello world') finally: self.transport.close() loop = asyncio.get_event_loop() server = loop.create_server(Server, '', 8000) loop.run_until_complete(server) loop.run_forever() 在我的电脑上大概 3000 QPS,比 Tornado 版慢了一些。此外,asyncio 的 transport 在 write 时不用 yield from,这点可能有些不一致。 asyncio 还有个高级版的 API:
import asyncio
@asyncio.coroutine
def handle(reader, writer):
yield from reader.read(1024)
writer.write(b'HTTP/1.1 200 OKrnrnhello world')
yield from writer.drain()
writer.close()
loop = asyncio.get_event_loop()
task = asyncio.start_server(handle, '', 8000, loop=loop)
server = loop.run_until_complete(task)
loop.run_forever()
importasyncio @asyncio.coroutine defhandle(reader, writer): yieldfromreader.read(1024) writer.write(b'HTTP/1.1 200 OKrnrnhello world') yieldfromwriter.drain() writer.close() loop = asyncio.get_event_loop() task = asyncio.start_server(handle, '', 8000, loop=loop) server = loop.run_until_complete(task) loop.run_forever() 在我的电脑上大概 2200 QPS。这下读写都要 yield from 了,一致性上来说会好些。 以框架的性能而言,其实都够用,开销都不超过 1 毫秒,而 web 请求一般都需要 10 毫秒的以上的处理时间。 于是顺便再测一下和 MySQL 的搭配,即在每个请求内调用一下 SELECT 1,然后输出返回值。 因为自己懒得写客户端了,于是就用现成的 tornado_mysql 和 aiomysql 来测试了。原理应该都差不多,发送写请求后就返回,等收到可读事件时再获取内容。 Tornado 版是这样:
from tornado.gen import coroutine
from tornado.ioloop import IOLoop
from tornado.tcpserver import TCPServer
from tornado_mysql import pools
class Server(TCPServer):
@coroutine
def handle_stream(self, stream, address):
try:
yield stream.read_bytes(1024, partial=True)
cursor = yield POOL.execute(b'SELECT 1')
data = cursor.fetchone()
yield stream.write('HTTP/1.1 200 OKrnrn{0[0]}'.format(data).encode()) # Python 3.5 的 bytes 才能用 % 格式化
finally:
stream.close()
POOL = pools.Pool(
dict(host='127.0.0.1', port=3306, user='root', passwd='123', db='mysql'),
max_idle_connections=10,
max_open_connections=10)
server = Server()
server.bind(8000)
server.start(1)
IOLoop.current().start()
fromtornado.genimportcoroutine
fromtornado.ioloopimportIOLoop
fromtornado.tcpserverimportTCPServer
fromtornado_mysqlimportpools
class Server(TCPServer):
@coroutine
defhandle_stream(self, stream, address):
try:
yieldstream.read_bytes(1024, partial=True)
cursor = yieldPOOL.execute(b'SELECT 1')
data = cursor.fetchone()
yieldstream.write('HTTP/1.1 200 OKrnrn{0[0]}'.format(data).encode()) # Python 3.5 的 bytes 才能用 % 格式化
finally:
stream.close()
POOL = pools.Pool(
dict(host='127.0.0.1', port=3306, user='root', passwd='123', db='mysql'),
max_idle_connections=10,
max_open_connections=10)
server = Server()
server.bind(8000)
server.start(1)
IOLoop.current().start()
在我的电脑上大概 680 QPS。 asyncio 版是这样:
import asyncio
import aiomysql
class Server(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
class Server(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
@asyncio.coroutine
def handle():
with (yield from pool) as conn:
cursor = yield from conn.cursor()
yield from cursor.execute(b'SELECT 1')
result = yield from cursor.fetchone()
try:
self.transport.write('HTTP/1.1 200 OKrnrn{0[0]}'.format(result).encode())
finally:
self.transport.close()
loop.create_task(handle()) # 或者 asyncio.async(handle())
@asyncio.coroutine
def get_pool():
return(yield from aiomysql.create_pool(host='127.0.0.1', port=3306, user='root', password='123', loop=loop))
loop = asyncio.get_event_loop()
pool = loop.run_until_complete(get_pool())
server = loop.create_server(Server, '', 8000)
loop.run_until_complete(server)
loop.run_forever()
importasyncio
importaiomysql
class Server(asyncio.Protocol):
defconnection_made(self, transport):
self.transport = transport
class Server(asyncio.Protocol):
defconnection_made(self, transport):
self.transport = transport
defdata_received(self, data):
@asyncio.coroutine
defhandle():
with (yieldfrompool) as conn:
cursor = yieldfromconn.cursor()
yieldfromcursor.execute(b'SELECT 1')
result = yieldfromcursor.fetchone()
try:
self.transport.write('HTTP/1.1 200 OKrnrn{0[0]}'.format(result).encode())
finally:
self.transport.close()
loop.create_task(handle()) # 或者 asyncio.async(handle())
@asyncio.coroutine
defget_pool():
return(yieldfromaiomysql.create_pool(host='127.0.0.1', port=3306, user='root', password='123', loop=loop))
loop = asyncio.get_event_loop()
pool = loop.run_until_complete(get_pool())
server = loop.create_server(Server, '', 8000)
loop.run_until_complete(server)
loop.run_forever()
在我的电脑上大概 1250 QPS,比 Tornado 版快了不少。不过写起来比较蛋疼,因为 data_received 方法里不能直接用 yield from。 用 cProfile 看了下,Tornado 版在 tornado.gen 和 functools 模块里花了不少时间,可能是异步调用过多了吧。但如果不做异步库的开发者,而只就使用者的体验而言,Tornado 会显得更加灵活和易用。不过 asyncio 的高级 API 应该也能提供类似的体验。 顺便再用底层 socket 模块写个服务器试试。先用 poll 看看,错误处理什么的就先不做了:
from functools import partial
import select
import socket
class Server:
def __init__(self):
self._sock = socket.socket()
self._poll = select.poll()
self._handlers = {}
self._fd_events = {}
def start(self):
sock = self._sock
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(0)
sock.bind(('', 8000))
sock.listen(100)
handlers = self._handlers
poll = self._poll
self.add_handler(sock.fileno(), self._accept, select.POLLIN)
while True:
poll_events = poll.poll(1)
for fd, event in poll_events:
handler = handlers.get(fd)
if handler:
handler()
def _accept(self):
for i in range(100):
try:
conn, address = self._sock.accept()
except OSError:
break
else:
conn.setblocking(0)
fd = conn.fileno()
self.add_handler(fd, partial(self._read, conn), select.POLLIN)
def _read(self, conn):
fd = conn.fileno()
self.remove_handler(fd)
try:
conn.recv(1024)
except:
conn.close()
raise
else:
self.add_handler(fd, partial(self._write, conn), select.POLLOUT)
def _write(self, conn):
fd = conn.fileno()
self.remove_handler(fd)
try:
conn.send(b'HTTP 1.0 200 OKrnrnhello world')
finally:
conn.close()
def add_handler(self, fd, handler, event):
self._handlers[fd] = handler
self.register(fd, event)
def remove_handler(self, fd):
self._handlers.pop(fd, None)
self.unregister(fd)
def register(self, fd, event):
if fd in self._fd_events:
raise IOError("fd %s already registered" % fd)
self._poll.register(fd, event)
self._fd_events[fd] = event
def unregister(self, fd):
event = self._fd_events.pop(fd, None)
if event is not None:
self._poll.unregister(fd)
Server().start()
fromfunctoolsimportpartial
importselect
importsocket
class Server:
def__init__(self):
self._sock = socket.socket()
self._poll = select.poll()
self._handlers = {}
self._fd_events = {}
defstart(self):
sock = self._sock
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(0)
sock.bind(('', 8000))
sock.listen(100)
handlers = self._handlers
poll = self._poll
self.add_handler(sock.fileno(), self._accept, select.POLLIN)
while True:
poll_events = poll.poll(1)
for fd, eventin poll_events:
handler = handlers.get(fd)
if handler:
handler()
def_accept(self):
for i in range(100):
try:
conn, address = self._sock.accept()
exceptOSError:
break
else:
conn.setblocking(0)
fd = conn.fileno()
self.add_handler(fd, partial(self._read, conn), select.POLLIN)
def_read(self, conn):
fd = conn.fileno()
self.remove_handler(fd)
try:
conn.recv(1024)
except:
conn.close()
raise
else:
self.add_handler(fd, partial(self._write, conn), select.POLLOUT)
def_write(self, conn):
fd = conn.fileno()
self.remove_handler(fd)
try:
conn.send(b'HTTP 1.0 200 OKrnrnhello world')
finally:
conn.close()
defadd_handler(self, fd, handler, event):
self._handlers[fd] = handler
self.register(fd, event)
defremove_handler(self, fd):
self._handlers.pop(fd, None)
self.unregister(fd)
defregister(self, fd, event):
if fdin self._fd_events:
raiseIOError("fd %s already registered" % fd)
self._poll.register(fd, event)
self._fd_events[fd] = event
defunregister(self, fd):
event = self._fd_events.pop(fd, None)
if eventis not None:
self._poll.unregister(fd)
Server().start()
在我的电脑上大概 7700 QPS,优势巨大。 再用 kqueue 试试(我用的是 OS X):
from functools import partial
import select
import socket
class Server:
def __init__(self):
self._sock = socket.socket()
self._kqueue = select.kqueue()
self._handlers = {}
self._fd_events = {}
def start(self):
sock = self._sock
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(0)
sock.bind(('', 8000))
sock.listen(100)
self.add_handler(sock.fileno(), self._accept, select.KQ_FILTER_READ)
handlers = self._handlers
while True:
kevents = self._kqueue.control(None, 1000, 1)
for kevent in kevents:
fd = kevent.ident
handler = handlers.get(fd)
if handler:
handler()
def _accept(self):
for i in range(100):
try:
conn, address = self._sock.accept()
except OSError:
break
else:
conn.setblocking(0)
fd = conn.fileno()
self.add_handler(fd, partial(self._read, conn), select.KQ_FILTER_READ)
def _read(self, conn):
fd = conn.fileno()
self.remove_handler(fd)
try:
conn.recv(1024)
except:
conn.close()
raise
else:
self.add_handler(fd, partial(self._write, conn), select.KQ_FILTER_WRITE)
def _write(self, conn):
fd = conn.fileno()
self.remove_handler(fd)
try:
conn.send(b'HTTP 1.0 200 OKrnrnhello world')
finally:
conn.close()
def add_handler(self, fd, handler, event):
self._handlers[fd] = handler
self.register(fd, event)
def remove_handler(self, fd):
self._handlers.pop(fd, None)
self.unregister(fd)
def register(self, fd, event):
if fd in self._fd_events:
raise IOError("fd %s already registered" % fd)
self._control(fd, event, select.KQ_EV_ADD)
self._fd_events[fd] = event
def unregister(self, fd):
event = self._fd_events.pop(fd, None)
if event is not None:
self._control(fd, event, select.KQ_EV_DELETE)
def _control(self, fd, event, flags):
change_list = (select.kevent(fd, event, flags),)
self._kqueue.control(change_list, 0)
Server().start()
fromfunctoolsimportpartial
importselect
importsocket
class Server:
def__init__(self):
self._sock = socket.socket()
self._kqueue = select.kqueue()
self._handlers = {}
self._fd_events = {}
defstart(self):
sock = self._sock
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(0)
sock.bind(('', 8000))
sock.listen(100)
self.add_handler(sock.fileno(), self._accept, select.KQ_FILTER_READ)
handlers = self._handlers
while True:
kevents = self._kqueue.control(None, 1000, 1)
for keventin kevents:
fd = kevent.ident
handler = handlers.get(fd)
if handler:
handler()
def_accept(self):
for i in range(100):
try:
conn, address = self._sock.accept()
exceptOSError:
break
else:
conn.setblocking(0)
fd = conn.fileno()
self.add_handler(fd, partial(self._read, conn), select.KQ_FILTER_READ)
def_read(self, conn):
fd = conn.fileno()
self.remove_handler(fd)
try:
conn.recv(1024)
except:
conn.close()
raise
else:
self.add_handler(fd, partial(self._write, conn), select.KQ_FILTER_WRITE)
def_write(self, conn):
fd = conn.fileno()
self.remove_handler(fd)
try:
conn.send(b'HTTP 1.0 200 OKrnrnhello world')
finally:
conn.close()
defadd_handler(self, fd, handler, event):
self._handlers[fd] = handler
self.register(fd, event)
defremove_handler(self, fd):
self._handlers.pop(fd, None)
self.unregister(fd)
defregister(self, fd, event):
if fdin self._fd_events:
raiseIOError("fd %s already registered" % fd)
self._control(fd, event, select.KQ_EV_ADD)
self._fd_events[fd] = event
defunregister(self, fd):
event = self._fd_events.pop(fd, None)
if eventis not None:
self._control(fd, event, select.KQ_EV_DELETE)
def_control(self, fd, event, flags):
change_list = (select.kevent(fd, event, flags),)
self._kqueue.control(change_list, 0)
Server().start()
在我的电脑上大概 7200 QPS,比 poll 版稍慢。不过因为只有 10 个并发连接,而且没有慢速网络的影响,所以 poll 的性能好并不奇怪。 再试试 Python 3.4 新增的 selectors 模块,它的 DefaultSelector 会自动选择所在平台最高效的实现,asyncio 就用到了这个模块。
import selectors
import socket
class Server:
def __init__(self):
self._sock = socket.socket()
self._selector = selectors.DefaultSelector()
def start(self):
sock = self._sock
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(0)
sock.bind(('', 8000))
sock.listen(100)
selector = self._selector
self.add_handler(sock.fileno(), self._accept, selectors.EVENT_READ)
while True:
events = selector.select(1)
for key, event in events:
handler, data = key.data
if data:
handler(**data)
else:
handler()
def _accept(self):
for i in range(100):
try:
conn, address = self._sock.accept()
except OSError:
break
else:
conn.setblocking(0)
fd = conn.fileno()
self.add_handler(fd, self._read, selectors.EVENT_READ, {'conn': conn})
def _read(self, conn):
fd = conn.fileno()
self.remove_handler(fd)
try:
conn.recv(1024)
except:
conn.close()
raise
else:
self.add_handler(fd, self._write, selectors.EVENT_WRITE, {'conn': conn})
def _write(self, conn):
fd = conn.fileno()
self.remove_handler(fd)
try:
conn.send(b'HTTP 1.0 200 OKrnrnhello world')
finally:
conn.close()
def add_handler(self, fd, handler, event, data=None):
self._selector.register(fd, event, (handler, data))
def remove_handler(self, fd):
self._selector.unregister(fd)
Server().start()
importselectors
importsocket
class Server:
def__init__(self):
self._sock = socket.socket()
self._selector = selectors.DefaultSelector()
defstart(self):
sock = self._sock
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(0)
sock.bind(('', 8000))
sock.listen(100)
selector = self._selector
self.add_handler(sock.fileno(), self._accept, selectors.EVENT_READ)
while True:
events = selector.select(1)
for key, eventin events:
handler, data = key.data
if data:
handler(**data)
else:
handler()
def_accept(self):
for i in range(100):
try:
conn, address = self._sock.accept()
exceptOSError:
break
else:
conn.setblocking(0)
fd = conn.fileno()
self.add_handler(fd, self._read, selectors.EVENT_READ, {'conn': conn})
def_read(self, conn):
fd = conn.fileno()
self.remove_handler(fd)
try:
conn.recv(1024)
except:
conn.close()
raise
else:
self.add_handler(fd, self._write, selectors.EVENT_WRITE, {'conn': conn})
def_write(self, conn):
fd = conn.fileno()
self.remove_handler(fd)
try:
conn.send(b'HTTP 1.0 200 OKrnrnhello world')
finally:
conn.close()
defadd_handler(self, fd, handler, event, data=None):
self._selector.register(fd, event, (handler, data))
defremove_handler(self, fd):
self._selector.unregister(fd)
Server().start()
在我的电脑上大概 6100 QPS,成绩也还不错。 从这些测试来看,如果想自己实现一个舍弃了一些功能和兼容性的 Tornado,应该能比它稍快一点,不过似乎没多大必要。所以暂时不纠结性能了,还是从使用的便利性上来考虑。Tornado 可以用 yield 取代 callback,我们也来实现这个 feature。 实现前先得了解下 yield。 当一个函数内部出现了 yield 语句时,它就不再是一个单纯的函数了,而是一个生成器函数,调用它并不会执行它的代码,而是返回一个生成器。 调用这个生成器的 send 方法时,才会执行内部的代码。当执行到 yield 时,这个 send 方法就返回了,调用者可以得到其返回值。 send 方法在第一次调用时,参数必须为 None。Python 2 中可以用它的 next 方法,Python 3 中改成了 __next__ 方法,还可以用内置的 next 函数来调用。 send 方法可以被多次调用,参数会作为 yield 的返回值,回到生成器内上一次执行的地方,并继续执行下去。 当生成器的代码执行完时,会抛出一个 StopIteration 的异常。Python 3.3 开始可以在生成器里使用 return,返回值可以从 StopIteration 异常的 value 属性获取。 for … in … 循环会自动捕获 StopIteration 异常,并作为循环停止的条件。 由此可见,yield 可以用于跳转。而我们要做的,则是在遇到 IO 请求时,用 yield 返回 IO loop;当事件发生时,找到对应的生成器,用 send 方法继续执行即可。为了简单起见,我就在 poll 版的基础上进行改造了:
from collections import deque
import select
import socket
from types import GeneratorType
class Stream:
def __init__(self, sock, loop):
sock.setblocking(0)
self._sock = sock
self._loop = loop
def close(self):
self._sock.close()
def read(self, size=1024):
sock = self._sock
fd = sock.fileno()
try:
data = sock.recv(size)
except OSError as e:
if e.errno == socket.EAGAIN or socket.EWOULDBLOCK:
self._loop.add_handler(fd, self.read(size), select.POLLIN)
yield
else:
raise
else:
return data
finally:
self._loop.remove_handler(fd)
def write(self, data):
sock = self._sock
fd = sock.fileno()
try:
try:
sent_bytes = sock.send(data)
except OSError as e:
if e.errno not in (socket.EAGAIN, socket.EWOULDBLOCK):
raise
else:
if sent_bytes == len(data):
return
data = data[sent_bytes:]
self._loop.add_handler(fd, self.write(data), select.POLLOUT)
yield
while data:
try:
sent_bytes = sock.send(data)
except OSError as e:
if e.errno not in (socket.EAGAIN, socket.EWOULDBLOCK):
raise
else:
if sent_bytes == len(data):
return
data = data[sent_bytes:]
yield
finally:
self._loop.remove_handler(fd)
class IOLoop:
def __init__(self):
self._poll = select.poll()
self._handlers = {}
self._fd_events = {}
def start(self):
handlers = self._handlers
poll = self._poll
while True:
poll_events = poll.poll(1)
for fd, event in poll_events:
handler = handlers.get(fd)
if handler:
if callable(handler):
handler()
else:
stack = handler
while True:
generator, value = stack[-1]
try:
value = generator.send(value)
if isinstance(value, GeneratorType):
stack.append([value, None])
else:
break
except StopIteration as e:
stack.pop()
if stack:
stack[-1][-1] = e.value
else:
break
def add_handler(self, fd, handler, event):
if isinstance(handler, GeneratorType):
self._handlers[fd] = deque([[handler, None]])
else:
self._handlers[fd] = handler
self.register(fd, event)
def remove_handler(self, fd):
self._handlers.pop(fd, None)
self.unregister(fd)
def update_handler(self, fd, event):
self.modify(fd, event)
def register(self, fd, event):
if fd in self._fd_events:
raise IOError("fd %s already registered" % fd)
self._poll.register(fd, event)
self._fd_events[fd] = event
def unregister(self, fd):
event = self._fd_events.pop(fd, None)
if event is not None:
self._poll.unregister(fd)
def modify(self, fd, event):
self._poll.modify(fd, event)
self._fd_events[fd] = event
class Server:
def __init__(self):
self._sock = socket.socket()
self._loop = IOLoop()
self._stream = Stream(self._sock, self._loop)
def start(self):
sock = self._sock
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(0)
sock.bind(('', 8000))
sock.listen(100)
self._loop.add_handler(sock.fileno(), self._accept, select.POLLIN)
self._loop.start()
def _accept(self):
for i in range(100):
try:
conn, address = self._sock.accept()
except OSError:
break
else:
stream = Stream(conn, self._loop)
fd = conn.fileno()
self._loop.add_handler(fd, self._handle(stream), select.POLLIN)
def _handle(self, stream):
yield stream.read()
yield stream.write(b'HTTP 1.0 200 OK\r\n\r\nhello world')
Server().start()
fromcollectionsimportdeque
importselect
importsocket
fromtypesimportGeneratorType
class Stream:
def__init__(self, sock, loop):
sock.setblocking(0)
self._sock = sock
self._loop = loop
defclose(self):
self._sock.close()
defread(self, size=1024):
sock = self._sock
fd = sock.fileno()
try:
data = sock.recv(size)
exceptOSErroras e:
if e.errno == socket.EAGAINor socket.EWOULDBLOCK:
self._loop.add_handler(fd, self.read(size), select.POLLIN)
yield
else:
raise
else:
return data
finally:
self._loop.remove_handler(fd)
defwrite(self, data):
sock = self._sock
fd = sock.fileno()
try:
try:
sent_bytes = sock.send(data)
exceptOSErroras e:
if e.errnonot in (socket.EAGAIN, socket.EWOULDBLOCK):
raise
else:
if sent_bytes == len(data):
return
data = data[sent_bytes:]
self._loop.add_handler(fd, self.write(data), select.POLLOUT)
yield
while data:
try:
sent_bytes = sock.send(data)
exceptOSErroras e:
if e.errnonot in (socket.EAGAIN, socket.EWOULDBLOCK):
raise
else:
if sent_bytes == len(data):
return
data = data[sent_bytes:]
yield
finally:
self._loop.remove_handler(fd)
class IOLoop:
def__init__(self):
self._poll = select.poll()
self._handlers = {}
self._fd_events = {}
defstart(self):
handlers = self._handlers
poll = self._poll
while True:
poll_events = poll.poll(1)
for fd, eventin poll_events:
handler = handlers.get(fd)
if handler:
if callable(handler):
handler()
else:
stack = handler
while True:
generator, value = stack[-1]
try:
value = generator.send(value)
if isinstance(value, GeneratorType):
stack.append([value, None])
else:
break
exceptStopIterationas e:
stack.pop()
if stack:
stack[-1][-1] = e.value
else:
break
defadd_handler(self, fd, handler, event):
if isinstance(handler, GeneratorType):
self._handlers[fd] = deque([[handler, None]])
else:
self._handlers[fd] = handler
self.register(fd, event)
defremove_handler(self, fd):
self._handlers.pop(fd, None)
self.unregister(fd)
defupdate_handler(self, fd, event):
self.modify(fd, event)
defregister(self, fd, event):
if fdin self._fd_events:
raiseIOError("fd %s already registered" % fd)
self._poll.register(fd, event)
self._fd_events[fd] = event
defunregister(self, fd):
event = self._fd_events.pop(fd, None)
if eventis not None:
self._poll.unregister(fd)
defmodify(self, fd, event):
self._poll.modify(fd, event)
self._fd_events[fd] = event
class Server:
def__init__(self):
self._sock = socket.socket()
self._loop = IOLoop()
self._stream = Stream(self._sock, self._loop)
defstart(self):
sock = self._sock
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(0)
sock.bind(('', 8000))
sock.listen(100)
self._loop.add_handler(sock.fileno(), self._accept, select.POLLIN)
self._loop.start()
def_accept(self):
for i in range(100):
try:
conn, address = self._sock.accept()
exceptOSError:
break
else:
stream = Stream(conn, self._loop)
fd = conn.fileno()
self._loop.add_handler(fd, self._handle(stream), select.POLLIN)
def_handle(self, stream):
yieldstream.read()
yieldstream.write(b'HTTP 1.0 200 OK\r\n\r\nhello world')
Server().start()
在我的电脑上大概 5300 QPS。 虽然成绩比较尴尬,但毕竟用起来比前一个版本好多了。至于慢的原因,我估计是自己维护了一个堆栈的原因(也可能是有什么 bug,毕竟写这个感觉太跳跃了,能运行起来就谢天谢地了)。实现时做了两点假设:
实现细节也没什么好说的了,只是觉得在实现 Stream 的 read / write 方法时,调用 IOLoop.add_handler 方法不太优雅。其实可以直接 yield 一个 fd 和 event,在 IOLoop.start 方法中再去注册。不过这个重构其实蛮小的,我就不再贴一次代码了,感兴趣的可以自己试试。 (责任编辑:最模板) |

Destoon B2B行业分类29328条阿
人气:3376
ecshop仿易视眼镜网模板整
人气:766
OLOMO欧莱诺ecshop模板
人气:734
Ecmall仿亚马逊英文外贸多
人气:1755
ecshop家具模板|家具商城程
人气:418
ecshop嘀嗒猫零食商城模板
人气:1037