admin 管理员组文章数量: 1087649
python:并发编程(十一)
前言
本文将和大家一起探讨python的多协程并发编程(中篇),使用内置基本库asyncio来实现并发,先通过官方来简单使用这个模块。先打好基础,能够有个基本的用法与认知,后续文章,我们再进行详细使用。
本文为python并发编程的第十一篇,上一篇文章地址如下:
python:并发编程(十)_Lion King的博客-CSDN博客
下一篇文章地址如下:
python:并发编程(十二)_Lion King的博客-CSDN博客
一、协程基础编程
1、Runners
在asyncio
中,运行协程(coroutines)的主要方式是使用asyncio
提供的run()
函数或run_until_complete()
方法。这些函数和方法会创建一个asyncio
事件循环(event loop)并运行其中的协程。
下面是一些示例代码,展示了如何使用asyncio
的运行器(runners)来运行协程:
(1)使用asyncio.run()
函数运行一个简单的协程:
import asyncioasync def my_coroutine():print("Running my_coroutine")async def main():await my_coroutine()asyncio.run(main())
(2)使用asyncio.run_until_complete()
方法运行一个任务(Task):
import asyncioasync def my_coroutine():print("Running my_coroutine")async def main():task = asyncio.create_task(my_coroutine())await asyncio.sleep(1) # 模拟其他操作await taskloop = asyncio.get_event_loop()
loop.run_until_complete(main())
(3)使用asyncio.get_event_loop().run_forever()
方法运行一个无限循环的协程:
import asyncioasync def my_coroutine():print("Running my_coroutine")async def main():while True:await my_coroutine()await asyncio.sleep(1) # 模拟每秒执行一次loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_forever()
通过这些运行器,可以方便地运行和管理协程的执行。根据具体的需求,选择适合的运行器来执行协程,并在事件循环中处理其他任务和事件。
2、协程与任务
在asyncio
中,协程(coroutine)和任务(task)是两个关键概念,用于并发地执行异步操作。
协程是一种特殊的函数,使用async
关键字定义,可以在函数内部使用await
语法来挂起执行,并在等待的操作完成后恢复执行。协程可以通过async def
语法定义,或者使用@asyncio.coroutine
装饰器进行标记。
任务是对协程的进一步封装,可以在事件循环中调度和执行。asyncio
提供了create_task()
函数来创建任务,也可以使用ensure_future()
函数将协程转换为任务。任务可以并发地执行,可以设置优先级,可以获取返回值或异常。
下面是一些示例代码,展示了如何使用协程和任务来实现异步操作:
import asyncioasync def my_coroutine():print("Running my_coroutine")await asyncio.sleep(1)print("Coroutine completed")async def main():print("Creating task...")task = asyncio.create_task(my_coroutine()) # 创建任务print("Task created")await asyncio.sleep(0.5)print("Waiting for task to complete...")await task # 等待任务完成print("Task completed")asyncio.run(main())
在上面的示例中,my_coroutine()
函数是一个简单的协程,通过asyncio.sleep()
来模拟异步操作。在main()
函数中,我们使用create_task()
函数创建了一个任务,并在之后通过await task
等待任务完成。
通过协程和任务的结合使用,可以方便地编写异步代码,实现并发和非阻塞的操作。在asyncio
中,任务是协程的扩展,提供了更多的灵活性和控制能力。
3、流
在asyncio
中,流(stream)是一种用于在异步环境中进行读取和写入操作的抽象。它提供了高层次的接口,用于处理网络通信、文件操作和其他类似的I/O操作。
asyncio
中的流包括两个主要类:StreamReader
和StreamWriter
。StreamReader
用于从流中读取数据,而StreamWriter
用于向流中写入数据。这两个类可以配合使用,实现异步的数据传输。
下面是一段示例代码,展示了如何使用asyncio
流进行异步读写操作:
服务端
import asyncioasync def handle_client(reader, writer):while True:data = await reader.read(1024)if not data:breakmessage = data.decode().strip()print(f"Received message from client: {message}")response = f"Server received: {message}"writer.write(response.encode())await writer.drain()print("Client disconnected")writer.close()async def start_server():server = await asyncio.start_server(handle_client, 'localhost', 8888)addr = server.sockets[0].getsockname()print(f'Server started on {addr}')async with server:await server.serve_forever()asyncio.run(start_server())
客户端
import asyncioasync def send_message(message):reader, writer = await asyncio.open_connection('localhost', 8888)writer.write(message.encode())await writer.drain()response = await reader.read(1024)print(f"Received response from server: {response.decode().strip()}")writer.close()await writer.wait_closed()asyncio.run(send_message("Hello, server!"))
这里的服务器端使用asyncio.start_server()
创建一个服务器,监听本地的8888端口。每当有客户端连接时,会调用handle_client()
协程来处理客户端的请求。在handle_client()
中,通过reader.read()
来读取客户端发送的数据,并将接收到的消息打印出来,然后通过writer.write()
将响应消息发送回客户端。
客户端使用asyncio.open_connection()
连接到服务器的地址和端口,并通过writer.write()
发送消息到服务器。然后使用reader.read()
读取服务器的响应,并将响应消息打印出来。
以上是一个简单的使用asyncio
流的客户端和服务器端示例。注意,在实际生产环境中,可能需要处理更复杂的通信逻辑和错误处理。
4、同步原语
在asyncio
中,同步原语(synchronization primitives)用于协调多个协程之间的执行顺序和共享资源的访问。它们提供了一些常用的同步机制,如锁、信号量、事件等,以确保协程在适当的时机进行等待和唤醒操作。
可能跟上一章节有点重复,但还是有必要继续巩固一下,以下是asyncio
中常见的同步原语:
(1)锁(Lock):asyncio.Lock
是一个简单的互斥锁,用于控制对共享资源的访问。一个协程可以使用async with
语句获取锁,并在使用完后释放锁。
import asyncioasync def coro(lock):async with lock:# 执行需要保护的代码passasync def main():lock = asyncio.Lock()await coro(lock)# 创建事件循环并运行异步函数
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
在上述代码中,将coro()
函数放在一个异步函数main()
中,然后在事件循环中通过loop.run_until_complete()
运行main()
函数。
这样,您就可以在异步环境中使用await
关键字,并正常使用asyncio.Lock()
来实现对代码块的保护。
(2)信号量(Semaphore):asyncio.Semaphore
是一个计数信号量,用于限制同时访问某个资源的协程数量。通过调用acquire()
和release()
方法,协程可以获取和释放信号量。
import asyncioasync def coro(semaphore):await semaphore.acquire()try:# 执行需要保护的代码passfinally:semaphore.release()async def main():semaphore = asyncio.Semaphore(5) # 最多允许5个协程同时访问await coro(semaphore)# 创建事件循环并运行异步函数
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
在上述代码中,将coro()
函数放在一个异步函数main()
中,然后在事件循环中通过loop.run_until_complete()
运行main()
函数。
这样,您就可以在异步环境中使用await
关键字,并正常使用asyncio.Semaphore()
来实现对代码块的保护。
(3)事件(Event):asyncio.Event
是一个简单的同步事件,用于协程之间的等待和通知。一个协程可以等待事件的状态为真(set)或假(clear),另一个协程可以通过设置事件状态来唤醒等待的协程。
import asyncioasync def coro(event):await event.wait() # 等待事件被设置为真# 执行后续操作async def main():event = asyncio.Event()event.set() # 设置事件状态为真await coro(event)# 创建事件循环并运行异步函数
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
在上述代码中,将coro()
函数放在一个异步函数main()
中,然后在事件循环中通过loop.run_until_complete()
运行main()
函数。
这样,您就可以在异步环境中使用await
关键字,并正常使用asyncio.Event()
来等待事件状态为真。
5、子进程集
在asyncio
中,可以使用asyncio.subprocess
模块来管理和运行子进程。asyncio.subprocess
提供了创建、启动和与子进程进行交互的功能,以便在异步程序中管理子进程的执行。
下面是一个示例代码,演示了如何使用asyncio.subprocess
来创建和运行子进程,并获取其输出:
import asyncioasync def run_command():# 创建子进程proc = await asyncio.create_subprocess_shell('ipconfig', # 要执行的命令stdout=asyncio.subprocess.PIPE # 指定标准输出管道)# 读取子进程的输出output = await proc.stdout.read()# 等待子进程结束await proc.wait()# 打印输出结果print(output)# 创建事件循环并运行子进程
loop = asyncio.get_event_loop()
loop.run_until_complete(run_command())
loop.close()
在上述代码中,create_subprocess_shell()
函数用于创建一个子进程,传递要执行的命令作为参数。通过stdout=asyncio.subprocess.PIPE
设置子进程的标准输出管道,以便在后续可以读取其输出。然后,使用proc.stdout.read()
读取子进程的输出,并使用proc.wait()
等待子进程结束。最后,通过print()
打印输出结果。
这是一个简单的示例,你可以根据实际需求对子进程的执行和输出进行更复杂的处理。asyncio.subprocess
还提供了其他方法和选项,例如指定子进程的工作目录、环境变量等。你可以参考asyncio
官方文档以获取更多详细信息和示例。
6、队列集
在asyncio
中,可以使用asyncio.Queue
来创建异步队列,实现协程之间的安全数据传递和通信。asyncio.Queue
提供了生产者-消费者模式的功能,可以在协程之间进行数据的异步传输。
下面是一个示例代码,演示了如何使用asyncio.Queue
进行协程间的数据传递:
import asyncioasync def producer(queue):for i in range(5):# 生产数据并放入队列await queue.put(i)await asyncio.sleep(1)async def consumer(queue):while True:# 从队列中获取数据data = await queue.get()print('Consumed:', data)await asyncio.sleep(0.5)# 标记任务完成queue.task_done()# 创建事件循环
loop = asyncio.get_event_loop()# 创建异步队列
queue = asyncio.Queue()# 创建生产者和消费者协程任务
producer_task = loop.create_task(producer(queue))
consumer_task = loop.create_task(consumer(queue))# 运行事件循环
try:loop.run_forever()
except KeyboardInterrupt:pass
finally:# 取消协程任务producer_task.cancel()consumer_task.cancel()# 等待任务完成loop.run_until_complete(asyncio.gather(producer_task, consumer_task))# 关闭事件循环loop.close()
在上述代码中,我们创建了一个asyncio.Queue
对象,并在生产者协程中使用queue.put()
将数据放入队列,消费者协程使用queue.get()
从队列中获取数据。协程之间通过队列实现了异步的数据传递。注意,在消费者协程中,我们使用queue.task_done()
来标记任务完成,以便在必要时等待队列中的所有任务完成。
在运行事件循环之前,我们使用loop.create_task()
创建了生产者和消费者协程任务,并在最后通过asyncio.gather()
等待任务完成。通过这种方式,我们可以在事件循环运行期间动态地添加和取消协程任务。
这是一个简单的示例,你可以根据实际需求扩展和修改代码。asyncio.Queue
提供了一系列方法和选项,例如设置队列的最大长度、非阻塞操作等。你可以参考asyncio
官方文档以获取更多详细信息和示例。
本文标签: python并发编程(十一)
版权声明:本文标题:python:并发编程(十一) 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.roclinux.cn/p/1700324352a397174.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论