Python async异步代码入门级10个案例!

Python 异步代码案例

初级async 代码案例实现异步

认识异步代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio

async def fetch_data(delay):
print("Fetching data...")
await asyncio.sleep(delay)
print("Data fetched")
return {"data":"Some data"}

async def main():
print("Start of main coroutine")
task = fetch_data(2)
result = await task
print(f"Received result: {result}")
print("End of main coroutine")

asyncio.run(main())

代码解释:

fetch_data 模拟一个网络下载的操作,首先异步等待了我们传入的值,然后模拟下载数据成功。

main 函数定义了开启异步,我们将异步返回的信息存为一个task,然后通过await然后获取值,然后打印出我们下载的信息。

运行结果:

变换一下异步请求形态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio

async def fetch_data(delay):
print("Fetching data...")
await asyncio.sleep(delay)
print("Data fetched")
return {"data":"Some data"}

async def main():
print("Start of main coroutine")
task = fetch_data(2)
print("End of main coroutine")
result = await task
print(f"Received result: {result}")


asyncio.run(main())

代码解释:
我们更改了下面的部分代码,我们可以看到将结束的信息放置到了前面,那么我们就会得到结束数据查询然后才会得到结束的数据!

运行结果:

异步如果这么写效率还是低下!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio

async def fetch_data(delay,id):
print("Fetching data.... id:", id)
await asyncio.sleep(delay)
print("Data fetched, id:",id)
return {"data":"Some data","id":id}

async def main():
task1 = fetch_data(2,1)
task2 = fetch_data(2,2)

result1 = await task1
print(f"Received result: {result1}")

result2 = await task2
print(f"Received result: {result2}")

asyncio.run(main())

代码解释:
从上面可以看出来,我们使用await task1, await task 2 这样的会导致我们只有在task1完成之后,才会去执行task2,效率就会变低!并没有得到效率上的提升。

运行结果:

这样写异步效率嘎嘎的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio

async def fetch_data(id, sleep_time):
print(f"Coroutine {id} starting to fetch data")
await asyncio.sleep(sleep_time)
return {"id":id, "data":f"Same data from coroutine {id}"}

async def main():
task1 = asyncio.create_task(fetch_data(1,2))
task2 = asyncio.create_task(fetch_data(2,3))
task3 = asyncio.create_task(fetch_data(3,1))

result1 = await task1
result2 = await task2
result3 = await task3

print(result1, result2,result3)

asyncio.run(main())

代码解释:
此处我们引入了task这个概念,也就是将我们的任务加入到task中这样就会做到异步执行多个代码,提高我们的运行效率!只需要3秒即可完成所有的程序

运行结果:

我想更加优雅的执行task

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio

async def fetch_data(id, sleep_time):
print(f"Coroutine {id} starting to fetch data.")
await asyncio.sleep(sleep_time)
return {"id":id,"data":f"Sameple data from coroutine {id}"}

async def main():
results = await asyncio.gather(fetch_data(1,2),fetch_data(2,1),fetch_data(3,3))
for result in results:
print(f"Received result: {result}")


asyncio.run(main())

代码解释:
gather 将所有的返回值作为一个对象存储起来,而不需要每个去创建一个task来实现,但是gather并不会处理在程序中出现的错误,
如果其中一个失败了,其他的会继续执行。

运行结果:

出错后怎么自动停止呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import asyncio

async def fetch_data(id, sleep_time):
print(f"Coroutine {id} starting to fetch data.")
await asyncio.sleep(sleep_time)
return {"id":id,"data":f"Sameple data from coroutine {id}"}

async def main():
tasks =[]
async with asyncio.TaskGroup() as tg:
for i, sleep_time in enumerate([2,1,3], start=1):
task = tg.create_task(fetch_data(i,sleep_time))
tasks.append(task)

results = [task.result() for task in tasks]

for result in results:
print(f"Received result: {result}")

asyncio.run(main())

代码解释:
这里我们新增的叫做事务组,他提供了更加好的对于错误的处理,如果其中一个出错了,那么其他的所有任务也都会停止运行!这里也是用到了上下文管理器来处理!

运行结果:

异步底层的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio

async def set_future_result(future, value):
await asyncio.sleep(2)
future.set_result(value) # 设置结果
print(f"Set the future's result to: {value}")

async def main():
loop = asyncio.get_running_loop() # 获取事件循环
future = loop.create_future() # 创建future

asyncio.create_task(set_future_result(future,"Future result is reday")) # 然后创建我们的任务实现未来的结果

result = await future
print(f"Received the future's result is {result}") # 等待完成后打印结果

asyncio.run(main())

代码解释:
上面的代码实际我们不需要掌握,因为在目前都已经被封装好了,属于略微底层实现的代码,我们这里的future实际上是未来的结果,但是你并不知道会在什么时候发生,我们在上面的代码里面就是创建了future,并且等待这个未来的结果

运行结果:

异步锁的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio

shared_resource=0

lock=asyncio.Lock()

async def modify_shared_resource():
global shared_resource

async with lock:
print(f"Resource before modification: {shared_resource}")
shared_resource += 1
await asyncio.sleep(1)
print(f"Resource after modification: {shared_resource}")

async def main():
await asyncio.gather(*(modify_shared_resource() for _ in range(5)))

asyncio.run(main())

代码解释:
我们这里引入锁的概念,加上锁可以防止异步访问导致出现的数据问题,例如存储数据库,表格等,异步情况下会出现同时修改数据的情况,所以我们需要加上锁,再同一时间只能有一个进行数据的读写操作!上下文操作使得我们不必关心释放锁的情况。

运行结果:

异步信号量的概念

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import asyncio

async def access_resource(semaphore, resource_id):
async with semaphore:
print(f"Accessing resource {resource_id}")
await asyncio.sleep(1)
print(f"Releasing resource {resource_id}")

async def main():
semaphore = asyncio.Semaphore(2) # 创建2个信号量
await asyncio.gather(*(access_resource(semaphore,i)for i in range(5)))

asyncio.run(main())

代码解释:
这里我们引入了信号量的概念,它和锁的概念类似,但是它是运行同一时刻候多少个程序可以访问一个对象,我们可以定义我们要使用的数量,可以进行限制,防止资源访问过载,也就是最大一次可以访问多少个。

运行结果:

异步event实现模拟同步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import asyncio

async def waiter(event):
print("waiting for the event to be set")
await event.wait() # 等待事件
print("event has been set, continuing execution")

async def setter(event):
await asyncio.sleep(2)
event.set() # 设置事件
print("event has been set!")

async def main():
event = asyncio.Event()
await asyncio.gather(waiter(event), setter(event))

asyncio.run(main())

代码解释
这里我们引入了事件Event, 它可以是我们模拟同步的情况,在异步中可能需要暂时的同步的时候可以使用,一旦我们设置了事件,那么我们必须等待事件完成后,才可以继续执行下一步!

运行结果:

总结

当我们以正确的方式打开异步请求我们的效率就会提升很多!


Python async异步代码入门级10个案例!
https://dreamshao.github.io/2025/07/07/async异步代码/
作者
Yun Shao
发布于
2025年7月7日
许可协议