完全掌握 Python 中的 Asyncio 异步编程

Administrator
发布于 2024-08-19 / 15 阅读
0
0

完全掌握 Python 中的 Asyncio 异步编程

在 Python 的世界里,asyncio 是处理现代 Web 和网络任务的一颗璀璨明珠。这个工具包是 Python 对编写用于并发 I/O 操作的简洁、高效和可扩展代码的解决方案。它可能一开始听起来有点吓人,因为它涉及事件循环、协程和 future。但一旦你掌握了它,你会想知道没有它你是如何生活的。所以,让我们一步一步地分解它,通过示例来了解异步世界的样子。

理解 Asyncio

在深入研究示例之前,首先要掌握 asyncio 的核心概念:

  • 事件循环:asyncio 提供的中央执行设备。它管理和分配不同任务的执行。它负责处理事件和调度异步例程。
  • 协程: 使用 async def 声明的异步函数。这些函数可以在 await 点暂停和恢复,允许 I/O 操作在后台运行。
  • Futures: 表示尚未完成的工作结果的对象。它们从事件循环调度的任务中返回。
  • 任务: 由事件循环包装到 Future 对象中的已调度协程,允许它们执行。

Asyncio 入门

首先,asyncio 的核心是编写可以同时执行多项操作的代码,而实际上并没有同时执行它们。这就像厨房里有一位厨师,他开始炖汤,他知道这需要时间来炖,所以他开始准备沙拉,而不是只是站在那里。这就是异步编程的本质——在不必要等待的情况下保持高效地运转。

关键字 await

Python 中的 await 关键字是异步编程的重要组成部分,它是在 Python 3.5 中引入的。它用于暂停 async 函数的执行,直到一个可等待对象(如协程、任务、Futures 或 I/O)完成,从而允许其他任务在此期间运行。这个关键特性可以有效地处理 I/O 密集型和高级结构化网络代码。

理解 await

  • 上下文: await 只能在 async 函数中使用。尝试在这样的上下文之外使用它会导致语法错误。
  • 目的: 它的主要目的是将控制权交还给事件循环,暂停封闭协程的执行,直到等待的对象被解析。这种非阻塞行为使得异步编程非常高效,特别是对于 I/O 密集型任务。
  • 可等待对象: 可以与 await 一起使用的对象必须是可等待的。最常见的可等待对象是用 async def 声明的协程,但其他的还包括 asyncio 任务、Futures 或任何具有 __await__() 方法的对象。

示例

“你好,异步世界!”

假设你的任务是在 2 秒钟的暂停后打印 “你好,异步世界!”。同步方法很简单:

import time

def say_hello():
    time.sleep(2)
    print("你好,异步世界?(还没呢)")

say_hello()

它完成了任务,但在等待这 2 秒钟的过程中,一切都停止了。

现在,让我们切换到 asyncio,展示异步的方式:

import asyncio

async def say_hello_async():
    await asyncio.sleep(2)
    print("你好,异步世界!")

asyncio.run(say_hello_async())

使用 asyncio,在我们等待的时候,事件循环可以做其他的事情,比如检查邮件或者播放音乐,这使得我们的代码是非阻塞的,效率更高:

import asyncio

async def say_hello_async():
    await asyncio.sleep(2)  # 模拟等待 2 秒
    print("你好,异步世界!")

async def do_something_else():
    print("开始另一个任务...")
    await asyncio.sleep(1)  # 模拟做其他事情 1 秒
    print("完成另一个任务!")

async def main():
    # 调度两个任务同时运行
    await asyncio.gather(
        say_hello_async(),
        do_something_else(),
    )

asyncio.run(main())

在这个修改后的版本中,main() 函数使用 asyncio.gather() 来同时运行 say_hello_async()do_something_else()。这意味着,当程序等待 say_hello_async() 函数完成其 2 秒的休眠时,它会启动并可能完成 do_something_else() 函数,在等待时间内有效地执行另一个任务。

获取网页(并发 I/O 任务)

获取网页是展示异步编程强大功能的经典例子。让我们比较一下同步和异步获取 URL 的情况。

同步 HTTP 请求主要由 requests 库完成,连续获取两个网页看起来像这样:

import requests
import time

start_time = time.time()

def fetch(url):
    return requests.get(url).text

page1 = fetch('http://example.com')
page2 = fetch('http://example.org')

print(f"完成时间:{time.time() - start_time} 秒")

# 输出:完成时间:0.6225857734680176 秒

这段代码非常简单,但它会闲置等待每个请求完成,然后再进行下一个请求。

让我们用 aiohttpasyncio 来提高效率,它们可以用于异步 HTTP 请求:

import aiohttp
import asyncio
import time

async def fetch_async(url, session):
    async with session.get(url) as response:
        return await response.text()

async def main():
    async with aiohttp.ClientSession() as session:
        page1 = asyncio.create_task(fetch_async('http://example.com', session))
        page2 = asyncio.create_task(fetch_async('http://example.org', session))
        await asyncio.gather(page1, page2)

start_time = time.time()
asyncio.run(main())
print(f"完成时间:{time.time() - start_time} 秒")

# 输出:完成时间:0.2990539073944092 秒

这个异步版本不会等待。当一个页面正在被获取时,它会开始获取下一个页面,大大减少了总等待时间。

读取文件(并发 I/O 任务)

让我们探索一个使用 asyncio 进行并发执行的不同用例,不再使用 web 请求。这一次,我们将专注于异步读取多个文件。当处理大型文件或不涉及网络通信的 I/O 密集型任务时,这尤其有用。

在同步设置中,一个接一个地读取多个文件会显著增加执行时间,尤其是在处理大型文件时:

# 同步读取多个文件
def read_file_sync(filepath):
    with open(filepath, 'r') as file:
        return file.read()

def read_all_sync(filepaths):
    return [read_file_sync(filepath) for filepath in filepaths]

filepaths = ['file1.txt', 'file2.txt']
data = read_all_sync(filepaths)
print(data)

对于异步版本,我们将使用 aiofiles,这是一个为异步文件操作提供支持的库。如果您还没有安装 aiofiles,可以使用 pip 安装:

pip install aiofiles

使用 aiofiles,我们可以执行文件 I/O 操作而不会阻塞事件循环,从而允许我们并发读取多个文件。

import asyncio
import aiofiles

# 异步读取单个文件
async def read_file_async(filepath):
    async with aiofiles.open(filepath, 'r') as file:
        return await file.read()

async def read_all_async(filepaths):
    tasks = [read_file_async(filepath) for filepath in filepaths]
    return await asyncio.gather(*tasks)

# 运行异步函数
async def main():
    filepaths = ['file1.txt', 'file2.txt']
    data = await read_all_async(filepaths)
    print(data)

asyncio.run(main())

异步版本通过利用 aiofilesasyncio.gather,允许并发读取多个文件。与同步版本(逐个读取每个文件)相比,这种方法显著减少了总执行时间。通过并发执行 I/O 操作,我们可以提高需要处理多个文件操作的程序的效率。

混合异步和同步:混合方法

有时,你无法逃避同步函数,但仍然想享受异步的乐趣。以下是如何混合它们的方法:

import asyncio
import time

def sync_task():
    print("开始一个缓慢的同步任务...")
    time.sleep(5)  # 模拟一个长时间任务
    print("完成缓慢的任务。")

async def async_wrapper():
    loop = asyncio.get_running_loop()
    await loop.run_in_executor(None, sync_task)

async def main():
    await asyncio.gather(
        async_wrapper(),
        # 想象一下这里还有其他的异步任务
    )

asyncio.run(main())

提供的代码片段演示了如何在 Python 的 asyncio 库的帮助下,将同步函数集成到异步环境中。

解释代码:

  1. 异步包装器(**async_wrapper**函数):

    • 这个异步函数演示了如何以不阻塞事件循环的方式运行同步 sync_task。它通过利用 loop.run_in_executor(None, sync_task) 来实现这一点。
    • loop.run_in_executor(None, sync_task)sync_task 调度到一个单独的线程或进程中运行,具体取决于所使用的执行器。默认的执行器(第一个参数指定为 None)在一个线程池中运行任务。
    • await 用于等待 sync_task 的完成,而不会阻塞事件循环,从而允许其他异步操作在此期间进行。
  2. 异步执行(**main** 函数):

    • main 异步函数展示了如何同时运行同步任务和异步任务而不会阻塞。
    • asyncio.gather 用于调度 async_wrapper 和其他潜在的异步任务的并发执行。通过使用 gather,您可以确保事件循环可以管理多个任务,并在可能的情况下并发运行它们。
  3. 启动事件循环(**asyncio.run(main())**):

    • 最后,调用 asyncio.run(main()) 来运行 main 协程,这将有效地启动事件循环并执行在 main 中调度的任务。

为什么需要这种方法?

  • 集成遗留代码: 在实际应用程序中,您经常会遇到本质上是同步的遗留代码。为了实现异步兼容性而重写大型代码库并不总是可行的。这种方法允许您将此类代码无缝集成到您的异步应用程序中。
  • 使用阻塞 I/O: 一些操作,特别是那些涉及阻塞 I/O 的操作,没有异步等效操作,或者您可能正在使用仅提供同步函数的第三方库。这种技术允许将这些操作卸载到一个线程中,从而释放事件循环来处理其他异步任务。
  • CPU 密集型任务: 尽管 CPU 密集型任务通常可以通过多处理来更好地处理,因为 Python 的全局解释器锁(GIL)的存在,但您有时可能会选择为了简单起见或因为计算开销不是特别高而在线程中运行它们。使用 run_in_executor 允许这些任务与 I/O 密集型异步任务共存。

Future() 对象

在 Python 的异步编程模型中,Future 是一个低级的可等待对象,它表示异步操作的最终结果。当您创建一个 Future 时,您实际上是在为一个将在将来某个时间点可用的结果声明一个占位符。Futures 是 asyncio 库的关键部分,允许对异步操作进行细粒度控制。

理解 Futures

  • 作用: Futures 用于将低级异步操作与高级 asyncio 应用程序连接起来。它们提供了一种管理异步操作状态的方法:挂起、完成(带结果)或失败(带异常)。
  • 用法: 通常情况下,在使用高级 asyncio 函数和构造(如任务,它是 Future 的子类)时,您不需要自己创建 Futures。但是,理解 Futures 对于与低级异步 API 交互或构建复杂的异步系统至关重要。

使用 Futures

Future 对象有几个关键的方法和属性:

  • set_result(result):设置 Future 的结果。这将将其标记为已完成,并通知所有正在等待的协程。
  • set_exception(exception):设置一个异常作为 Future 的结果。这也会将其标记为已完成,但会在等待时引发异常。
  • add_done_callback(callback):添加一个回调函数,当 Future 完成时(无论是带结果完成还是带异常完成)调用该函数。
  • result():返回 Future 的结果。如果 Future 未完成,则会引发 InvalidStateError。如果 Future 是带异常完成的,则此方法将重新引发该异常。
  • done():如果 Future 已完成,则返回 True。如果 Future 有结果或异常,则认为它已完成。
import asyncio

# 使用 Future 模拟异步操作的函数
async def async_operation(future, data):
    await asyncio.sleep(1)  # 模拟一些异步工作,并带延迟

    # 根据输入数据设置结果或异常
    if data == "success":
        future.set_result("操作成功")
    else:
        future.set_exception(RuntimeError("操作失败"))

# Future 完成时要调用的回调函数
def future_callback(future):
    try:
        print("回调:", future.result())  # 尝试打印结果
    except Exception as exc:
        print("回调:", exc)  # 如果有异常,则打印异常

async def main():
    # 创建一个 Future 对象
    future = asyncio.Future()

    # 为 Future 添加回调
    future.add_done_callback(future_callback)

    # 启动异步操作并传递 Future
    await async_operation(future, "success")  # 尝试将 "success" 更改为其他任何内容以模拟失败

    # 检查 Future 是否已完成并打印其结果
    if future.done():
        try:
            print("主函数:", future.result())
        except Exception as exc:
            print("主函数:", exc)

# 运行主协程
asyncio.run(main())

工作原理

  • async_operation 是一个异步函数,模拟一个异步任务,它接受一个 Future 对象和一些 data 作为参数。它等待 1 秒钟来模拟一些异步工作。根据 data 的值,它要么使用 set_resultFuture 上设置一个结果,要么使用 set_exception 引发一个异常。
  • future_callback 是一个回调函数,一旦 Future 完成就会打印其结果。它通过调用 future.result() 来检查操作是成功还是失败,future.result() 要么返回结果,要么重新引发在 Future 中设置的异常。
  • main 协程中,创建一个 Future 对象,并使用 add_done_callbackfuture_callback 添加为其回调。然后使用 Future 和示例数据("success" 或任何其他值来模拟失败)等待 async_operation
  • async_operation 完成后,main 使用 done() 检查 Future 是否已完成。然后,它尝试直接打印结果,并处理任何潜在的异常。

此示例简洁地演示了使用 Python 中的 asyncio 的 Futures 管理异步操作的基本机制,包括设置结果、处理异常、使用回调和检索操作结果。

结论

在 Python 应用程序中采用 asyncio 可以显著提高 I/O 密集型和网络驱动型程序的性能和可伸缩性。通过理解和应用事件循环、协程、futures 和任务的概念,开发人员可以编写高效、非阻塞的代码,轻松处理数千个并发连接。本文提供的示例虽然不多,但展示了 asyncio 的多功能性,并演示了如何使用它在 Python 应用程序中实现并发,为某些类型的任务提供了优于传统同步代码的明显优势。


评论