A lunchtime story to demonstrate threading, asyncio, multiprocessing & cloud functions
Mar 17, 2021
I'm going to tell a story to explain the different concurrency and parallelism options in Python.
In this story we will see how a single person multitasking is like concurrency, and multiple people doing their own tasks is like parallelism. We'll watch these scenarios in action at some well known lunchtime restaurants as they quickly and efficiently serve their customers. Then I'll implement these restaurants in Python, and finally I'll compare the different concurrency options and explain when to use each one.
I'll explain:
Let's start with the definitions:
A system is said to be concurrent if it can support two or more actions in progress at the same time.
A system is said to be parallel if it can support two or more actions executing simultaneously.
The key concept and difference between these definitions is the phrase "in progress."
And now let's jump right into the story.
At lunchtime you turn down a street you never noticed before. It has two food options: a market stall called Concurrent Burgers, and a shop called Parallel Salads.
Both of them look delicious but have long queues so you wonder which one will serve you first.
Concurrent Burgers is run by a middle aged lady with a Python tattoo on her arm who laughs heartily as she works. She's performing the following tasks
She switches between the tasks seamlessly. At one moment she's checking the patties on the grill and removing any that are cooked, the next she's taking an order, the next if any patties are ready, she makes a burger and completes the juicy order.
Parallel Salads is staffed by a number of identical men who display plastered on smiles and politely converse as they work. Each of them makes a salad for a single customer. They take the order, add all the ingredients to a fresh bowl, smother in dressing, exuberantly mix it up, fill a container with a healthy salad, and discard the bowl. Meanwhile one other clone takes dirty bowls and washes them.
The key difference between the two restaurants is the number of workers and how the tasks are being performed:
You notice that both restaurants are serving customers at the same rate. The woman in Concurrent Burgers is making multiple burgers at the same time and is limited by the rate that her tiny grill will output cooked patties. Parallel Salads is employing multiple men to make a single salad at a time and is limited by the length of time it takes to put a salad together.
You realise that Concurrent Burgers is I/O bound and Parallel Salads is CPU bound:
Unable to make a decision you remain locked in the same state for five minutes looking bewildered, before an opinionated friend interrupts you and invites you to join them in a queue.
N.B Parallel Salads is concurrent as well as parallel as 'two or more actions are in progress at the same time'. Parallel processing is a subset of concurrent processing.
These two shops provide an intuition for the difference between concurrent and parallel tasks. We'll now investigate how to implement both of these in Python.
Python has two options available for concurrency:
It has this library built in for parallelism:
There is another option for parallelism when running Python programs in the cloud:
Let's look at two possible implementation of Concurrent Burgers using threading and asyncio. In both cases there's a single worker taking orders, cooking patties, and making burgers.
With both threading and asyncio only a single processor is running but it is jumping between the different tasks it needs to do. The difference between threading and asyncio is how the decision to change task is taken.
With threading the worker switches tasks at any time during the execution. The worker is half way through taking an order when she suddenly switches to checking the patties or making a burger, before again switching at any point to one of the other tasks.
Let's have a look at Concurrent Burgers implemented using threads:
from concurrent.futures import ThreadPoolExecutor
import queues
# Note: Some methods and variables are skipped
# to focus only on the threading details
def run_concurrent_burgers():
# Create blocking queues
customers = queue.Queue()
orders = queue.Queue(maxsize=5) # Process up to 5 orders at once
cooked_patties = queue.Queue()
# The grill is entirely independent of the worker,
# and turns raw patties into cooked patties.
# This is like reading from disk or doing a network request
grill = Grill()
# Run the three tasks using a thread pool executor
with ThreadPoolExecutor() as executor:
executor.submit(take_orders, customers, orders)
executor.submit(cook_patties, grill, cooked_patties)
executor.submit(make_burgers, orders, cooked_patties)
def take_orders(customers, orders):
while True:
customer = customers.get()
order = take_order(customer)
orders.put(order)
def cook_patties(grill, cook_patties):
for position in range(len(grill)):
grill[position] = raw_patties.pop()
while True:
for position, patty in enumerate(grill):
if patty.cooked:
cooked_patties.put(patty)
grill[position] = raw_patties.pop()
# Don't check again for another minute
threading.sleep(60)
def make_burgers(orders, cooked_patties):
while True:
patty = cooked_patties.get()
order = orders.get()
burger = order.make_burger(patty)
customer = order.shout_for_customer()
customer.serve(burger)
Each of the task of taking orders, cooking patties and making burgers is an infinite loop the keeps performing its action.
In run_concurrent_burgers
we start each of the tasks in a separate thread. We
could manually create a thread for each task but there is a much nicer interface
called ThreadPoolExecutor
, which creates a thread for each task that we submit
to it.
When using multiple threads we must ensure only one thread at a time is reading or writing any piece of state. Otherwise we could end up in a situation when two threads are holding the same patty and we end up with a rather angry customer; this issue is known as thread safety.
To avoid this issue we use Queues
to pass state around. Within the individual
tasks, the Queues
block when calling get
until there's a customer,
order, or patty ready. The operating system won't try to switch to any thread
that is blocked which gives us an easy way to safely hand off state. As long
as the thread putting the state onto a Queue
doesn't use it again, then the
thread getting the state knows that it won't be changed while it is using it.
asyncio
due to overhead of switching between system threadsIn asyncio there is a single event loop which manages all the tasks. Tasks can be in a number of different states but the most important two are ready or waiting. On each loop the event loop checks to see if any waiting tasks are now ready due to another task completing. Then it picks a ready task and runs it until the task completes or needs to wait for another task, often this will be an I/O operation like reading from the disk or making an http request.
There are two keywords that cover the majority of asyncio uses: async and await.
Let's have a looks at Concurrent Burgers implemented using asyncio:
import asyncio
# Note: Some methods and variables are skipped
# to focus only on the asyncio details
def run_concurrent_burgers():
# These queues give up control
customers = asyncio.Queue()
orders = asyncio.Queue(maxsize=5) # Only process up to five orders at once
cooked_patties = asyncio.Queue()
# The grill runs entirely independently to the worker,
# and turn raw patties into cooked patties
grill = Grill()
# Run all tasks using the default asyncio event loop
asyncio.gather(
take_orders(customers, orders),
cook_patties(grill, cooked_patties),
make_burgers(orders, cooked_patties),
)
# Declare asyncio tasks with async def
async def take_orders(customers, orders):
while True:
# Allow switching to another task here
# and at all other awaits
customer = await customers.get()
order = take_order(customer)
await orders.put(order)
async def cook_patties(grill, cooked_patties):
for position in range(len(grill)):
grill[position] = raw_patties.pop()
while True:
for position, patty in enumerate(grill):
if patty.cooked:
# put_noawait allows us to add to the queue without
# creating a new task and giving up control
cooked_patties.put_noawait(patty)
grill[position] = raw_patties.pop()
# Wait 30 seconds before checking again
await asyncio.sleep(30)
async def make_burgers(orders, cooked_patties):
while True:
patty = await cooked_patties.get()
order = await orders.get()
burger = order.make_burger(patty)
customer = await order.shout_for_customer()
customer.serve(burger)
Each of the tasks of taking orders, cooking patties and making burgers is a
declared with async def
.
Within these tasks the worker switches to a new task each time await
is
called. This happens:
The final piece of the puzzle is In run_concurrent_burger
which calls
asyncio.gather
to schedule all tasks to be run by the event loop, who in this
case is our worker.
As we know exactly when the tasks switch we actually don't need to be careful
about sharing state. We could implement this using just lists for the queues and
know that two tasks wouldn't accidentally be holding the same patty. However
using asyncio
queues are highly recommended as they allow us to very easily
cooperate between the tasks by providing sensible points for suspending the
current task.
One interesting aspect of using asyncio
is that the async
keyword changes
the interface to the function as it cannot be directly called from non-async
functions. This can be considered a good or bad thing. On the one hand you could
say it hurts composability as you cannot mix asyncio
and normal functions. On
the other hand, if asyncio
is only used for I/O, this forces a separation of
I/O and business logic, restricting the asyncio
code to the edge of the
application and making the code base easier to understand and test. Explicitly
labelling I/O is a fairly common practice in typed functional languages - in
Haskell it is required.
asyncio
- checkout some
benchmarksasyncio
At Parallel Salads there are multiple worker making salads in parallel and we're going to implement it using multiprocessing
Then we're going to visit Cloud Coffees to see how cloud functions can be used to run tasks in parallel.
This is perfectly demonstrated by Parallel Salads.
Each worker in Parallel Salads is represented by a new process which is
spawned by the operating system. These processes are created through the
ProcessPoolExecutor
which assigns each of them tasks.
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
# Note: Some methods and variables are skipped to
# focus only on the multiprocessing details
def run_parallel_salads():
# Create multiprocessing queues that can
# communicate across process boundaries
customers = mp.Queue()
bowls = mp.Queue()
dirty_bowls = mp.Queue()
# Run the tasks in parallel using a process pool executor
with ProcessPoolExecutor(max_workers=NUM_STAFF) as executor:
# Set all but one worker making salads
for _ in range(NUM_STAFF - 1):
executor.submit(make_salad, customers, bowls, dirty_bowls)
# Set the other worker washing bowls
executor.submit(wash_bowls, dirty_bowls, bowls)
def make_salad(customers, bowls):
while True:
customer = customers.get()
order = take_order(customer)
bowl = bowls.get()
bowl.add(ingredients)
bowl.add(dressing)
bowl.mix()
salad = fill_container(bowl)
customer.serve(salad)
dirty_bowls.put(bowl)
def wash_bowls(dirty_bowls, bowls):
while True:
bowl = dirty_bowls.get()
wash(bowl)
bowls.put(bowl)
With multiprocessing
each task is run in a separate process. These processes
are independently run in parallel by the operating system and don't block each
other. The number of processes that can actually be run in parallel is limited
by the number of cores on the CPU so we would limit the number of staff actually
making salad to this.
As these tasks are in different processes they don't share any normal Python state. Each process has an independent copy of the whole program state. We must use special multiprocessing queues to communicate between them.
One use case for multiprocessing is for offloading CPU intensive tasks in an
asyncio
application to stop them blocking the rest of the application. Here's
a short sketch of how to do that:
import asyncio
from concurrent.futures import ProcessPoolExecutor
process_pool = ProcessPoolExecutor() # Default size is number of cores
async def handle_long_request(n):
event_loop = asyncio.get_running_loop()
# calculate_n_pi will be run in a separate process allowing the asyncio event
# loop to continue to handle other async tasks in parallel
return await event_loop.run_in_executor(process_pool_executor, calculate_n_pi, n)
def calculate_n_pi(n):
threading.sleep(60)
return n * 3.14
As you and your friend walk to the park to eat your lunch you spot a fluffy multicoloured cloud hovering above a group of people. You peer closer and see a sign for Cloud Coffees.
Even though your friend hates coffee you both decide to grab one for fun. As you walk up you are each confronted by your own individual stall with a single barista which slowly floats down out of the cloud. You place your order and the barista makes your coffee and serves it to you.
A boisterous crowd of people suddenly arrive at Cloud Coffees, and after a short wait for more stalls to float down they are all quickly served. These extra baristas wait around for a little while for more customers, completely oblivious to the other stalls, before floating back into the cloud.
As you step back you see that at any one time there are about the same number of stalls as there are customers placing an order. If more customers arrive more stalls appear out of the cloud, when orders are complete after a little wait the stalls disappear back into the cloud.
Your friend has asked for a ridiculously complex order to try and drown out the taste of coffee and still hasn't got his drink. The barista is adding marshmallows and a chocolate flake when suddenly the barista throws the whole coffee unceremoniously in the bin and shouts "Timeout" at him.
You both walk off to the park in hysterics.
Cloud functions are another option worth considering if you're writing a web service. There are by far the easiest to write as only a single order is fulfilled at once, you can completely forget about concurrency.
def cloud_coffees(order):
ground_coffee = grind_beans()
coffee = brew_coffee(ground_coffee)
coffee.add_embellishments(order)
return coffee
Each request is fulfilled by a separate instance of the whole application. When a new instance is created there is a little latency involved as it starts up. For this reason an instance may stick around waiting for more requests which will be fulfilled with almost no latency. After a little while if there are no requests it will be reclaimed.
Each request will timeout after a few minutes depending on the implementation. You have to make sure that your tasks complete before this timeout or they will vanish without finishing.
Instances cannot communicate with other instances and should never store any state between requests as the instance could disappear at any point.
The most common implementations of this are AWS Lambda, Azure Functions and Google Cloud Functions.
Let's summarise everything we've talked about into a single table.
||threading|asyncio|multiprocessing|cloud functions| |---|---|---|---|---| |Concurrency Type|Pre-emptive multitasking|Cooperative multitasking|Multiprocessing|Multi-instance| |Concurrent or parallel|Concurrent|Concurrent|Parallel|Parallel| |Explicit control of concurrency|False|True|False|False| |Switching decision|The operating system decides when to switch tasks|The tasks decide when to give up control|The processes run at the same time in different CPU cores|The requests run at the same time in different instances| |Maximum parallel processes|1|1|Number of cores in CPU|Unlimited| |Communication between tasks|Shared state|Shared state|Multiprocessing queues and return values|Impossible| |Thread safe|False|True|True|True| |Tasks to use on|I/O bound|I/O bound|CPU bound|CPU bound if it's less than timeout (~5 minutes)| |Task Overhead|A system thread per task consumes RAM and increases switching time between tasks|As little as possible, all tasks are run in a single process in a single thread|A system process per task consumes more RAM switching time than threading|Starting new instances comes with a latency cost|
Now that you understand all the options is becomes easy to select one.
Before doing this though you should double check that you actually need to speed up your tasks. If it's being run once a week and takes 10 minutes, is there any point in speeding it up?
If it is, then just refer to this flowchart:
You've now seen examples of the core concurrency options available in Python:
as well as a deployment option that provides a simplified environment for parallel Python:
You also know the differences between them, the pros and cons of each, and when to select each option.
I hope you find this useful!
Sourcery helps you write better code faster by understanding your existing code and suggesting refactorings to improve its quality.
It won't help you make architectural decisions like which concurrency option to use, but once you've made that choice it will keep your code simple and clean.
Check it out!