Python 异步代码案例 初级async 代码案例实现异步 认识异步代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import asyncioasync def fetch_data (delay ): print ("Fetching data..." ) await asyncio.sleep(delay) print ("Data fetched" ) return {"data" :"Some data" }async def main (): print ("Start of main coroutine" ) task = fetch_data(2 ) result = await task print (f"Received result: {result} " ) print ("End of main coroutine" ) asyncio.run(main())
代码解释:
fetch_data 模拟一个网络下载的操作,首先异步等待了我们传入的值,然后模拟下载数据成功。
main 函数定义了开启异步,我们将异步返回的信息存为一个task,然后通过await然后获取值,然后打印出我们下载的信息。
运行结果:
变换一下异步请求形态 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import asyncioasync def fetch_data (delay ): print ("Fetching data..." ) await asyncio.sleep(delay) print ("Data fetched" ) return {"data" :"Some data" }async def main (): print ("Start of main coroutine" ) task = fetch_data(2 ) print ("End of main coroutine" ) result = await task print (f"Received result: {result} " ) asyncio.run(main())
代码解释: 我们更改了下面的部分代码,我们可以看到将结束的信息放置到了前面,那么我们就会得到结束数据查询然后才会得到结束的数据!
运行结果:
异步如果这么写效率还是低下! 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import asyncioasync def fetch_data (delay,id ): print ("Fetching data.... id:" , id ) await asyncio.sleep(delay) print ("Data fetched, id:" ,id ) return {"data" :"Some data" ,"id" :id }async def main (): task1 = fetch_data(2 ,1 ) task2 = fetch_data(2 ,2 ) result1 = await task1 print (f"Received result: {result1} " ) result2 = await task2 print (f"Received result: {result2} " ) asyncio.run(main())
代码解释: 从上面可以看出来,我们使用await task1, await task 2 这样的会导致我们只有在task1完成之后,才会去执行task2,效率就会变低!并没有得到效率上的提升。
运行结果:
这样写异步效率嘎嘎的 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import asyncioasync def fetch_data (id , sleep_time ): print (f"Coroutine {id } starting to fetch data" ) await asyncio.sleep(sleep_time) return {"id" :id , "data" :f"Same data from coroutine {id } " }async def main (): task1 = asyncio.create_task(fetch_data(1 ,2 )) task2 = asyncio.create_task(fetch_data(2 ,3 )) task3 = asyncio.create_task(fetch_data(3 ,1 )) result1 = await task1 result2 = await task2 result3 = await task3 print (result1, result2,result3) asyncio.run(main())
代码解释: 此处我们引入了task这个概念,也就是将我们的任务加入到task中这样就会做到异步执行多个代码,提高我们的运行效率!只需要3秒即可完成所有的程序
运行结果:
我想更加优雅的执行task 1 2 3 4 5 6 7 8 9 10 11 12 13 14 import asyncioasync def fetch_data (id , sleep_time ): print (f"Coroutine {id } starting to fetch data." ) await asyncio.sleep(sleep_time) return {"id" :id ,"data" :f"Sameple data from coroutine {id } " }async def main (): results = await asyncio.gather(fetch_data(1 ,2 ),fetch_data(2 ,1 ),fetch_data(3 ,3 )) for result in results: print (f"Received result: {result} " ) asyncio.run(main())
代码解释: gather 将所有的返回值作为一个对象存储起来,而不需要每个去创建一个task来实现,但是gather并不会处理在程序中出现的错误, 如果其中一个失败了,其他的会继续执行。
运行结果:
出错后怎么自动停止呢? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import asyncioasync def fetch_data (id , sleep_time ): print (f"Coroutine {id } starting to fetch data." ) await asyncio.sleep(sleep_time) return {"id" :id ,"data" :f"Sameple data from coroutine {id } " }async def main (): tasks =[] async with asyncio.TaskGroup() as tg: for i, sleep_time in enumerate ([2 ,1 ,3 ], start=1 ): task = tg.create_task(fetch_data(i,sleep_time)) tasks.append(task) results = [task.result() for task in tasks] for result in results: print (f"Received result: {result} " ) asyncio.run(main())
代码解释: 这里我们新增的叫做事务组,他提供了更加好的对于错误的处理,如果其中一个出错了,那么其他的所有任务也都会停止运行!这里也是用到了上下文管理器来处理!
运行结果:
异步底层的逻辑 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import asyncioasync def set_future_result (future, value ): await asyncio.sleep(2 ) future.set_result(value) print (f"Set the future's result to: {value} " )async def main (): loop = asyncio.get_running_loop() future = loop.create_future() asyncio.create_task(set_future_result(future,"Future result is reday" )) result = await future print (f"Received the future's result is {result} " ) asyncio.run(main())
代码解释: 上面的代码实际我们不需要掌握,因为在目前都已经被封装好了,属于略微底层实现的代码,我们这里的future实际上是未来的结果,但是你并不知道会在什么时候发生,我们在上面的代码里面就是创建了future,并且等待这个未来的结果
运行结果:
异步锁的实现 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import asyncio shared_resource=0 lock=asyncio.Lock()async def modify_shared_resource (): global shared_resource async with lock: print (f"Resource before modification: {shared_resource} " ) shared_resource += 1 await asyncio.sleep(1 ) print (f"Resource after modification: {shared_resource} " )async def main (): await asyncio.gather(*(modify_shared_resource() for _ in range (5 ))) asyncio.run(main())
代码解释: 我们这里引入锁的概念,加上锁可以防止异步访问导致出现的数据问题,例如存储数据库,表格等,异步情况下会出现同时修改数据的情况,所以我们需要加上锁,再同一时间只能有一个进行数据的读写操作!上下文操作使得我们不必关心释放锁的情况。
运行结果:
异步信号量的概念 1 2 3 4 5 6 7 8 9 10 11 12 13 14 import asyncioasync def access_resource (semaphore, resource_id ): async with semaphore: print (f"Accessing resource {resource_id} " ) await asyncio.sleep(1 ) print (f"Releasing resource {resource_id} " )async def main (): semaphore = asyncio.Semaphore(2 ) await asyncio.gather(*(access_resource(semaphore,i)for i in range (5 ))) asyncio.run(main())
代码解释: 这里我们引入了信号量的概念,它和锁的概念类似,但是它是运行同一时刻候多少个程序可以访问一个对象,我们可以定义我们要使用的数量,可以进行限制,防止资源访问过载,也就是最大一次可以访问多少个。
运行结果:
异步event实现模拟同步 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import asyncioasync def waiter (event ): print ("waiting for the event to be set" ) await event.wait() print ("event has been set, continuing execution" )async def setter (event ): await asyncio.sleep(2 ) event.set () print ("event has been set!" )async def main (): event = asyncio.Event() await asyncio.gather(waiter(event), setter(event)) asyncio.run(main())
代码解释 这里我们引入了事件Event, 它可以是我们模拟同步的情况,在异步中可能需要暂时的同步的时候可以使用,一旦我们设置了事件,那么我们必须等待事件完成后,才可以继续执行下一步!
运行结果:
总结 当我们以正确的方式打开异步请求我们的效率就会提升很多!