Python Asyncio - Using async with python!
As more companies get used to the idea of building data products (which, in my opinion is a different problem as compared to build a software product), chances are you will need to consider upstream/down streams systems.
If you are working for a big company with resources under your disposal, great. Otherwise, chances are you will need to manage your own product which comes with architectural / design choices that a data scientist is completely new to.
One of the common problems that comes up often enough is integrating with feature stores or multiple data sources. Usually such processing happens on the database side and not your machine - resulting in an IO (input-output) problem. Coupled with the rising popularity of Fastapi, it seems like a good idea to get familiarity with asynchronous work flow.
What is asynchronous?
Personally, I find Fastapi’s explanation of async. Citing:
Asynchronous code just means that the language 💬 has a way to tell the computer / program 🤖 that at some point in the code, it 🤖 will have to wait for something else to finish somewhere else. Let’s say that something else is called “slow-file” 📝.
Another special note that asynchronous is related to concurrency and is part of a larger topic, and usually studied with parallelism. The Realpython referrences will be a good place to start.
Pre-req & setup
Familiarity with python 3.7+ ! (and the use of generators)
Note, for these examples I executed them with Jupyter notebooks!
These are the packages you need:
1
2
3
4
asyncio
nest-asyncio
# custom async packages you might need depending on your use case
aiohttp
Normal python code
Consider the following code,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import time
def feat1():
for i in [10, 20, 40]:
time.sleep(1)
yield i
def feat2():
for i in [50, 500, 5000]:
time.sleep(1))
yield i
time_now = time.perf_counter()
value1 = [x for x in feat1()]
value2 = [x for x in feat2()]
time_end = time.perf_counter()
print("total time taken is {}".format(time_end - time_now))
"""
total time taken is 6.0033508999913465
"""
During value1
when the program was executing, each iteration python was taking 1 second before the next data arrives. During this 1 second, we could be receiving output / do data processing instead of waiting and doing nothing.
This is when Async comes into play:
Async Version
This is the equivalent of the above, just in async!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# async generators
async def afeat1():
for i in [10, 20, 40]:
await asyncio.sleep(1)
yield i
async def afeat2():
for i in [50, 500, 5000]:
await asyncio.sleep(1)
yield i
async def process_feat(asyncgenerator):
empty_list = []
async for i in asyncgenerator:
empty_list.append(i)
return empty_list
data = asyncio.gather(process_feat(afeat1()), process_feat(afeat2()))
# typical examples will not run
s = time.perf_counter()
output = await data
print(output)
elapsed = time.perf_counter() - s
print(elapsed) # 3.003277199997683
"""
[[10, 20, 40], [50, 500, 5000]]
3.005606100021396
"""
(If you encountered an error, go to the next section)
The first thing to realize is writing async code is expensive!
Always go back to fundamentals! Before deciding to apply async, check that your code has an IO problem or it already fits your required SLA!
We’ll go through in details in the Async sections! but generally the following changes are noticeable:
- functions have an
await
keyword infront of them -
time.sleep
has becomeasyncio.sleep
-
for
loops also haveasync
in them
Most importantly of all, you need to tell python which tasks to asyncio.gather
so the async code can run! Thus, from the earlier example of 6 seconds, by running both asynchronously, taking 3 seconds instead.
Running with Python
If you are running it as a python script or in a python console, you will encounter this error:
1
SyntaxError: 'await' outside function
To run it in python, you need to change it to:
1
2
3
4
5
async def main():
data = await asyncio.gather(process_feat(afeat1()), process_feat(afeat2()))
return data
output = asyncio.run(main())
This is how your script should look:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import asyncio
import time
# async generators
async def afeat1():
for i in [10, 20, 40]:
await asyncio.sleep(1)
yield i
async def afeat2():
for i in [50, 500, 5000]:
await asyncio.sleep(1)
yield i
async def process_feat(asyncgenerator):
empty_list = []
async for i in asyncgenerator:
empty_list.append(i)
return empty_list
async def main():
data = await asyncio.gather(process_feat(afeat1()), process_feat(afeat2()))
return data
s = time.perf_counter()
output = asyncio.run(main())
print(output)
elapsed = time.perf_counter() - s
print(elapsed)
Running it as a python executable in terminal:
1
2
3
(base) /workspaces/asyncio# python example.py
[[10, 20, 40], [50, 500, 5000]]
3.0077412000100594
However, this code will not work if you are using jupyter notebook.
Running with Jupyter
Running this in a notebook will generate this error:
1
RuntimeError: asyncio.run() cannot be called from a running event loop
Turns out, Jupyter is already using an event loop. More information on this can be found in stackoverflow here
Use await
The first way to solve it, is not to use asyncio.run
and use await
directly.
1
2
3
4
5
6
s = time.perf_counter()
# output = asyncio.run(main()) # this line is commented out
output = await main()
print(output)
elapsed = time.perf_counter() - s
print(elapsed)
Nested-asyncio
If you need to use it in notebook environment, you can use nest_asyncio and run it in the jupyer cell
1
2
import nest_asyncio
nest_asyncio.apply()
Async!
Async is actually a concurrent programming design that received dedicated in Python, evolving rapidly from Python 3.4 . The keywrods async/await
are new python-keywords that are used to define coroutines!
Async Functions
Previously, to use async, you need to apply a decorator. Since python 35 onwards it has evolved greatly, perhaps following the javascript design.
1
2
3
4
5
6
7
8
9
10
import asyncio
@asyncio.coroutine
def py_old_way():
yield from stuff()
# since py35++
async def py_new_way():
"""Native coroutine, modern syntax"""
await stuff()
Async Sleep, Loops
For most of your code, there is some learning curve / debugging to figure out the async equivalent.
sleep
For example, time.sleep(x)
is now changed to asyncio.sleep(x)
. With for loops or with
statements you will need to add async
infront of them.
1
2
3
4
5
async with aiohttp.ClientSession() as client:
pass
async for i in asyncgenerator(x):
pass
Async Gather
Once you define the tasks to run concurrently, you need to gather them with asyncio.gather
- In layman terms its sort of telling python to gather all the tasks (or coroutine) so it can executes in an event loop.
Await
Once the event loop is defined, then you inform python to gather all the results before proceeding.
Async processing
Typically when you hit databases, the response is being returned as a generator. The example below shows an example where you retrieve multiple generators, process the data and the output in an async fashion.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import asyncio
import time
async def afeat1():
for i in [10, 20, 40]:
await asyncio.sleep(0.1)
yield i
async def afeat2():
for i in [50, 500, 5000]:
await asyncio.sleep(0.1)
yield i
async def task(asyncgenerator, input_func):
output = input_func([i async for i in asyncgenerator])
return output
async def complex_task(asyncgenerator):
min_, max_, sum, count = 0, 0, 0, 0
async for i in asyncgenerator:
min_ = min(min_, i)
max_ = max(max_, i)
sum += i
count += 1
mean = sum / count
return [min_, max_, mean]
s = time.perf_counter()
output = asyncio.gather(
task(afeat1(), sum), task(afeat2(), max), complex_task(afeat2())
)
# asyncio.run(output)
task1, task2, task3 = await output
print(task1, task2, task3) # 70 5000 [0, 5000, 1850.0]
elapsed = time.perf_counter() - s
print(elapsed) # 3.0022729000047548
"""
70 5000 [0, 5000, 1850.0]
0.3041836000047624
"""
Other Modules
There are many other async packages, I usually refer to the resources section over at realpython asyncio tutorial. There are a whole bunch of packages/resources such as using async with files, redis, postgres, kafka etc.
I will be updating the following sections in the future as I explore more async features!
AIOHTTP
AIOHTTP stands for async io http requests, that supports your typical rest comamnds such as get, post, put etc.
This is useful when calling multiple websites and waiting for a response:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import aiohttp
import asyncio
async def fetch(client):
async with client.get("http://python.org") as resp:
assert resp.status == 200
return await resp.text()
async def main():
async with aiohttp.ClientSession() as client:
html = await fetch(client)
print(html)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())