脚本专栏 
首页 > 脚本专栏 > 浏览文章

Python实现异步IO的示例

(编辑:jimmy 日期: 2025/1/7 浏览:3 次 )

前言

  用阻塞 API 写同步代码最简单,但一个线程同一时间只能处理一个请求,有限的线程数导致无法实现万级别的并发连接,过多的线程切换也抢走了 CPU 的时间,从而降低了每秒能够处理的请求数量。为了达到高并发,你可能会选择一个异步框架,用非阻塞 API 把业务逻辑打乱到多个回调函数,通过多路复用与事件循环的方式实现高并发。

磁盘 IO 为例,描述了多线程中使用阻塞方法读磁盘,2 个线程间的切换方式。那么,怎么才能实现高并发呢?

Python实现异步IO的示例

把上图中本来由内核实现的请求切换工作,交由用户态的代码来完成就可以了,异步化编程通过应用层代码实现了请求切换,降低了切换成本和内存占用空间。异步化依赖于 IO 多路复用机制,比如 Linux 的 epoll 或者 Windows 上的 iocp,同时,必须把阻塞方法更改为非阻塞方法,才能避免内核切换带来的巨大消耗。Nginx、Redis 等高性能服务都依赖异步化实现了百万量级的并发。

下图描述了异步 IO 的非阻塞读和异步框架结合后,是如何切换请求的。

Python实现异步IO的示例

然而,写异步化代码很容易出错。因为所有阻塞函数,都需要通过非阻塞的系统调用拆分成两个函数。虽然这两个函数共同完成一个功能,但调用方式却不同。第一个函数由你显式调用,第二个函数则由多路复用机制调用。

这种方式违反了软件工程的内聚性原则,函数间同步数据也更复杂。特别是条件分支众多、涉及大量系统调用时,异步化的改造工作会非常困难。

Python如何实现异步调用

from flask import Flask
import time
app = Flask(__name__)


@app.route('/bar')
def bar():
  time.sleep(1)
  return '<h1>bar!</h1>'

@app.route('/foo')
def foo():
  time.sleep(1)
  return '<h1>foo!</h1>'
if __name__ == '__main__':
  app.run(host='127.0.0.1',port=5555,debug=True)

采用同步的方式调用

import requests
import time

starttime = time.time()
print(requests.get('http://127.0.0.1:5555/bar').content)
print(requests.get('http://127.0.0.1:5555/foo').content)
print("消耗时间: ",time.time() -starttime)

b'<h1>bar!</h1>'
b'<h1>foo!</h1>'
消耗时间:  2.015509605407715

采样异步的方式调用:

重点:

1.将阻塞io改为非阻塞io;

2.多路复用io监听内核事件,事件触发通过回调函数;

3.用户态代码采取事件循环的方式获取事件,执行事件的回调函数;

import selectors
import socket
import time
# from asynrequest import ParserHttp
class asynhttp:
  def __init__(self):
    self.selecter = selectors.DefaultSelector()

  def get(self,url,optiondict = None):
    global reqcount
    reqcount += 1
    s = socket.socket()
    s.setblocking(False)
    try:
      s.connect(('127.0.0.1',5555))
    except BlockingIOError:
      pass
    requset = 'GET %s HTTP/1.0\r\n\r\n' % url
    callback = lambda : self.send(s,requset)
    self.selecter.register(s.fileno(),selectors.EVENT_WRITE,callback)

  def send(self,s,requset):
    self.selecter.unregister(s.fileno())
    s.send(requset.encode())
    chunks = []
    callback = lambda: self.recv(s,chunks)
    self.selecter.register(s.fileno(),selectors.EVENT_READ,callback)

  def recv(self,s,chunks):
    self.selecter.unregister(s.fileno())
    chunk = s.recv(1024)
    if chunk:
      chunks.append(chunk)
      callback = lambda: self.recv(s,chunks)
      self.selecter.register(s.fileno(), selectors.EVENT_READ, callback)
    else:
      global reqcount
      reqcount -= 1
      request_first,request_headers,request_content,_ = ParserHttp.parser(b''.join(chunks))
      print("解析数据:",request_first,request_headers,request_content)
      print((b''.join(chunks)).decode())
      return (b''.join(chunks)).decode()

starttime = time.time()
reqcount = 0
asynhttper = asynhttp()
asynhttper.get('/bar')
asynhttper.get('/foo')
while reqcount:
  events = asynhttper.selecter.select()
  for event,mask in events:
    func = event.data
    func()
print("消耗时间:" ,time.time() - starttime)

HTTP/1.0 200 OK
Content-Type: text/html; charset=utf-8
Content-Length: 13
Server: Werkzeug/1.0.1 Python/3.7.7
Date: Thu, 15 Oct 2020 03:28:16 GMT

<h1>bar!</h1>
HTTP/1.0 200 OK
Content-Type: text/html; charset=utf-8
Content-Length: 13
Server: Werkzeug/1.0.1 Python/3.7.7
Date: Thu, 15 Oct 2020 03:28:16 GMT

<h1>foo!</h1>
消耗时间: 1.0127637386322021

以上就是Python实现异步IO的示例的详细内容,更多关于python 异步IO的资料请关注其它相关文章!

上一篇:Python Serial串口基本操作(收发数据)
下一篇:python基于exchange函数发送邮件过程详解
一句话新闻
一文看懂荣耀MagicBook Pro 16
荣耀猎人回归!七大亮点看懂不只是轻薄本,更是游戏本的MagicBook Pro 16.
人们对于笔记本电脑有一个固有印象:要么轻薄但性能一般,要么性能强劲但笨重臃肿。然而,今年荣耀新推出的MagicBook Pro 16刷新了人们的认知——发布会上,荣耀宣布猎人游戏本正式回归,称其继承了荣耀 HUNTER 基因,并自信地为其打出“轻薄本,更是游戏本”的口号。
众所周知,寻求轻薄本的用户普遍更看重便携性、外观造型、静谧性和打字办公等用机体验,而寻求游戏本的用户则普遍更看重硬件配置、性能释放等硬核指标。把两个看似难以相干的产品融合到一起,我们不禁对它产生了强烈的好奇:作为代表荣耀猎人游戏本的跨界新物种,它究竟做了哪些平衡以兼顾不同人群的各类需求呢?