Making lunch faster with Python concurrency

A lunchtime story to demonstrate threading, asyncio, multiprocessing & cloud functions

Date

Mar 17, 2021

Burger
Photo by amirali mirhashemian on Unsplash

Table of Contents

Introduction

I'm going to tell a story to explain the different concurrency and parallelism options in Python.

In this story we will see how a single person multitasking is like concurrency, and multiple people doing their own tasks is like parallelism. We'll watch these scenarios in action at some well known lunchtime restaurants as they quickly and efficiently serve their customers. Then I'll implement these restaurants in Python, and finally I'll compare the different concurrency options and explain when to use each one.

I'll explain:

  • What the difference is between concurrency and parallelism?
  • The different concurrency options and how they compare, including threading, asyncio, multiprocessing and cloud functions
  • The pro and cons of each concurrency option
  • How to select your concurrency option using a single flowchart

What is concurrency and parallelism?

Let's start with the definitions:

A system is said to be concurrent if it can support two or more actions in progress at the same time.

A system is said to be parallel if it can support two or more actions executing simultaneously.

The key concept and difference between these definitions is the phrase "in progress."

The Art of Concurrency

And now let's jump right into the story.

At lunchtime you turn down a street you never noticed before. It has two food options: a market stall called Concurrent Burgers, and a shop called Parallel Salads.

Both of them look delicious but have long queues so you wonder which one will serve you first.

Concurrent Burgers is run by a middle aged lady with a Python tattoo on her arm who laughs heartily as she works. She's performing the following tasks

  • Taking an order
  • Flipping burger patties
  • Filling a bap with salad, a patty and condiments, and completing the order

She switches between the tasks seamlessly. At one moment she's checking the patties on the grill and removing any that are cooked, the next she's taking an order, the next if any patties are ready, she makes a burger and completes the juicy order.

Parallel Salads is staffed by a number of identical men who display plastered on smiles and politely converse as they work. Each of them makes a salad for a single customer. They take the order, add all the ingredients to a fresh bowl, smother in dressing, exuberantly mix it up, fill a container with a healthy salad, and discard the bowl. Meanwhile one other clone takes dirty bowls and washes them.

The key difference between the two restaurants is the number of workers and how the tasks are being performed:

  • Concurrent Burgers has multiple tasks in progress at the same time (but not simultaneously), and a single worker who is switching between them.
  • Parallel Salads has multiple tasks in progress simultaneously and has multiple workers who are all doing one task at a time.

You notice that both restaurants are serving customers at the same rate. The woman in Concurrent Burgers is making multiple burgers at the same time and is limited by the rate that her tiny grill will output cooked patties. Parallel Salads is employing multiple men to make a single salad at a time and is limited by the length of time it takes to put a salad together.

You realise that Concurrent Burgers is I/O bound and Parallel Salads is CPU bound:

  • I/O bound means that a program is limited by the I/O subsystem, which in computer terms means reading from disk or performing network requests. In Concurrent Burgers it is cooking patties
  • CPU bound means that a program is limited by the speed of the CPU. If the CPU went faster the program would go faster. In Parallel Salads it is the speed of the person making the salad.

Unable to make a decision you remain locked in the same state for five minutes looking bewildered, before an opinionated friend interrupts you and invites you to join them in a queue.

N.B Parallel Salads is concurrent as well as parallel as 'two or more actions are in progress at the same time'. Parallel processing is a subset of concurrent processing.

These two shops provide an intuition for the difference between concurrent and parallel tasks. We'll now investigate how to implement both of these in Python.

What are the options?

Python has two options available for concurrency:

  • threading
  • asyncio

It has this library built in for parallelism:

  • multiprocessing

There is another option for parallelism when running Python programs in the cloud:

  • cloud functions

Concurrency in practice

Let's look at two possible implementation of Concurrent Burgers using threading and asyncio. In both cases there's a single worker taking orders, cooking patties, and making burgers.

With both threading and asyncio only a single processor is running but it is jumping between the different tasks it needs to do. The difference between threading and asyncio is how the decision to change task is taken.

  • In threading the operating system knows about the different threads and will interrupt them at any point and change to a different task. The program itself has no control over this. This is called pre-emptive multitasking since the operating system can pre-empt your thread to make the switch. In most programming languages threads run in parallel but in Python only one is allowed to execute at once.
  • With asyncio the program itself decides when to switch between tasks. Each task cooperates with the other tasks by giving up control when it is ready to switch. For this reason it is called cooperative multitasking since each task must cooperate by giving up control when it can no longer make progress.

Concurrent Burgers with threading

With threading the worker switches tasks at any time during the execution. The worker is half way through taking an order when she suddenly switches to checking the patties or making a burger, before again switching at any point to one of the other tasks.

Let's have a look at Concurrent Burgers implemented using threads:

from concurrent.futures import ThreadPoolExecutor
import queues


# Note: Some methods and variables are skipped
#       to focus only on the threading details


def run_concurrent_burgers():
    # Create blocking queues
    customers = queue.Queue()
    orders = queue.Queue(maxsize=5)  # Process up to 5 orders at once
    cooked_patties = queue.Queue()

    # The grill is entirely independent of the worker,
    # and turns raw patties into cooked patties.
    # This is like reading from disk or doing a network request
    grill = Grill()

    # Run the three tasks using a thread pool executor
    with ThreadPoolExecutor() as executor:
        executor.submit(take_orders, customers, orders)
        executor.submit(cook_patties, grill, cooked_patties)
        executor.submit(make_burgers, orders, cooked_patties)


def take_orders(customers, orders):
    while True:
        customer = customers.get()
        order = take_order(customer)
        orders.put(order)


def cook_patties(grill, cook_patties):
    for position in range(len(grill)):
        grill[position] = raw_patties.pop()

    while True:
        for position, patty in enumerate(grill):
            if patty.cooked:
                cooked_patties.put(patty)
                grill[position] = raw_patties.pop()

        # Don't check again for another minute
        threading.sleep(60)


def make_burgers(orders, cooked_patties):
    while True:
        patty = cooked_patties.get()
        order = orders.get()
        burger = order.make_burger(patty)
        customer = order.shout_for_customer()
        customer.serve(burger)

Each of the task of taking orders, cooking patties and making burgers is an infinite loop the keeps performing its action.

In run_concurrent_burgers we start each of the tasks in a separate thread. We could manually create a thread for each task but there is a much nicer interface called ThreadPoolExecutor, which creates a thread for each task that we submit to it.

When using multiple threads we must ensure only one thread at a time is reading or writing any piece of state. Otherwise we could end up in a situation when two threads are holding the same patty and we end up with a rather angry customer; this issue is known as thread safety.

To avoid this issue we use Queues to pass state around. Within the individual tasks, the Queues block when calling get until there's a customer, order, or patty ready. The operating system won't try to switch to any thread that is blocked which gives us an easy way to safely hand off state. As long as the thread putting the state onto a Queue doesn't use it again, then the thread getting the state knows that it won't be changed while it is using it.

threading pros

  • I/O doesn't stop other tasks progressing
  • Excellent Python version and library support - if it can be run single threaded it's highly likely it can be run multi threaded

threading cons

  • Slower than asyncio due to overhead of switching between system threads
  • Not thread safe
  • No speed up for CPU bound problems like making salads (due to Python only allowing a single thread to run at once) - a single worker making multiple salads at the same time would not be faster than if they made them one after another, since each salad still takes the same time to make in total.

Concurrent Burgers with asyncio

In asyncio there is a single event loop which manages all the tasks. Tasks can be in a number of different states but the most important two are ready or waiting. On each loop the event loop checks to see if any waiting tasks are now ready due to another task completing. Then it picks a ready task and runs it until the task completes or needs to wait for another task, often this will be an I/O operation like reading from the disk or making an http request.

There are two keywords that cover the majority of asyncio uses: async and await.

  • async is used to mark that a function must be run as a separate task.
  • await creates a new task and gives up control to the event loop. It puts the task into the waiting state and will become ready again when the new task is completed.

Let's have a looks at Concurrent Burgers implemented using asyncio:

import asyncio

# Note: Some methods and variables are skipped
#       to focus only on the asyncio details


def run_concurrent_burgers():
    # These queues give up control
    customers = asyncio.Queue()
    orders = asyncio.Queue(maxsize=5)  # Only process up to five orders at once
    cooked_patties = asyncio.Queue()

    # The grill runs entirely independently to the worker,
    # and turn raw patties into cooked patties
    grill = Grill()

    # Run all tasks using the default asyncio event loop
    asyncio.gather(
        take_orders(customers, orders),
        cook_patties(grill, cooked_patties),
        make_burgers(orders, cooked_patties),
    )


# Declare asyncio tasks with async def
async def take_orders(customers, orders):
    while True:
        # Allow switching to another task here
        # and at all other awaits
        customer = await customers.get()
        order = take_order(customer)
        await orders.put(order)


async def cook_patties(grill, cooked_patties):
    for position in range(len(grill)):
        grill[position] = raw_patties.pop()

    while True:
        for position, patty in enumerate(grill):
            if patty.cooked:
                # put_noawait allows us to add to the queue without
                # creating a new task and giving up control
                cooked_patties.put_noawait(patty)
                grill[position] = raw_patties.pop()

        # Wait 30 seconds before checking again
        await asyncio.sleep(30)


async def make_burgers(orders, cooked_patties):
    while True:
        patty = await cooked_patties.get()
        order = await orders.get()
        burger = order.make_burger(patty)
        customer = await order.shout_for_customer()
        customer.serve(burger)

Each of the tasks of taking orders, cooking patties and making burgers is a declared with async def.

Within these tasks the worker switches to a new task each time await is called. This happens:

  • While taking orders
    • when about to speak to the next customer
    • when adding the order to the orders queue
  • While cooking patties
    • when all the patties have been checked
  • While making burgers
    • when waiting for a cooked patty
    • when waiting for an order
    • when finding a customer to give them their burger

The final piece of the puzzle is In run_concurrent_burger which calls asyncio.gather to schedule all tasks to be run by the event loop, who in this case is our worker.

As we know exactly when the tasks switch we actually don't need to be careful about sharing state. We could implement this using just lists for the queues and know that two tasks wouldn't accidentally be holding the same patty. However using asyncio queues are highly recommended as they allow us to very easily cooperate between the tasks by providing sensible points for suspending the current task.

One interesting aspect of using asyncio is that the async keyword changes the interface to the function as it cannot be directly called from non-async functions. This can be considered a good or bad thing. On the one hand you could say it hurts composability as you cannot mix asyncio and normal functions. On the other hand, if asyncio is only used for I/O, this forces a separation of I/O and business logic, restricting the asyncio code to the edge of the application and making the code base easier to understand and test. Explicitly labelling I/O is a fairly common practice in typed functional languages - in Haskell it is required.

Asyncio pros

  • Extremely fast for I/O bound activities
    • Less overhead than threading due to there being only a single system thread
    • All the fastest web server frameworks are using asyncio - checkout some benchmarks
  • Thread safe

Asyncio cons

  • No speed up for CPU bound problems
  • Recently added to Python
    • Requires Python 3.5+
    • Library support is good for most I/O tasks but not as complete as for not usingasyncio

Parallelism in practice

At Parallel Salads there are multiple worker making salads in parallel and we're going to implement it using multiprocessing

Then we're going to visit Cloud Coffees to see how cloud functions can be used to run tasks in parallel.

Parallel Salads with multiprocessing

This is perfectly demonstrated by Parallel Salads.

Each worker in Parallel Salads is represented by a new process which is spawned by the operating system. These processes are created through the ProcessPoolExecutor which assigns each of them tasks.

import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor

# Note: Some methods and variables are skipped to
#       focus only on the multiprocessing details


def run_parallel_salads():
    # Create multiprocessing queues that can
    # communicate across process boundaries
    customers = mp.Queue()
    bowls = mp.Queue()
    dirty_bowls = mp.Queue()

    # Run the tasks in parallel using a process pool executor
    with ProcessPoolExecutor(max_workers=NUM_STAFF) as executor:
        # Set all but one worker making salads
        for _ in range(NUM_STAFF - 1):
            executor.submit(make_salad, customers, bowls, dirty_bowls)

        # Set the other worker washing bowls
        executor.submit(wash_bowls, dirty_bowls, bowls)


def make_salad(customers, bowls):
    while True:
        customer = customers.get()
        order = take_order(customer)
        bowl = bowls.get()
        bowl.add(ingredients)
        bowl.add(dressing)
        bowl.mix()
        salad = fill_container(bowl)
        customer.serve(salad)
        dirty_bowls.put(bowl)


def wash_bowls(dirty_bowls, bowls):
    while True:
        bowl = dirty_bowls.get()
        wash(bowl)
        bowls.put(bowl)

With multiprocessing each task is run in a separate process. These processes are independently run in parallel by the operating system and don't block each other. The number of processes that can actually be run in parallel is limited by the number of cores on the CPU so we would limit the number of staff actually making salad to this.

As these tasks are in different processes they don't share any normal Python state. Each process has an independent copy of the whole program state. We must use special multiprocessing queues to communicate between them.

Aside - asyncio and multiprocessing

One use case for multiprocessing is for offloading CPU intensive tasks in an asyncio application to stop them blocking the rest of the application. Here's a short sketch of how to do that:

import asyncio
from concurrent.futures import ProcessPoolExecutor

process_pool = ProcessPoolExecutor()  # Default size is number of cores


async def handle_long_request(n):
    event_loop = asyncio.get_running_loop()
    # calculate_n_pi will be run in a separate process allowing the asyncio event
    # loop to continue to handle other async tasks in parallel
    return await event_loop.run_in_executor(process_pool_executor, calculate_n_pi, n)


def calculate_n_pi(n):
    threading.sleep(60)
    return n * 3.14

Multiprocessing pros

  • Speeds up CPU bound tasks
  • Thread safe
  • Can be used to run long calculations from web servers in separate processes

Multiprocessing cons

  • No sharing of resources
  • High overhead - do not use for I/O bound tasks

Cloud Coffees with Cloud Functions

As you and your friend walk to the park to eat your lunch you spot a fluffy multicoloured cloud hovering above a group of people. You peer closer and see a sign for Cloud Coffees.

Even though your friend hates coffee you both decide to grab one for fun. As you walk up you are each confronted by your own individual stall with a single barista which slowly floats down out of the cloud. You place your order and the barista makes your coffee and serves it to you.

A boisterous crowd of people suddenly arrive at Cloud Coffees, and after a short wait for more stalls to float down they are all quickly served. These extra baristas wait around for a little while for more customers, completely oblivious to the other stalls, before floating back into the cloud.

As you step back you see that at any one time there are about the same number of stalls as there are customers placing an order. If more customers arrive more stalls appear out of the cloud, when orders are complete after a little wait the stalls disappear back into the cloud.

Your friend has asked for a ridiculously complex order to try and drown out the taste of coffee and still hasn't got his drink. The barista is adding marshmallows and a chocolate flake when suddenly the barista throws the whole coffee unceremoniously in the bin and shouts "Timeout" at him.

You both walk off to the park in hysterics.

Cloud functions are another option worth considering if you're writing a web service. There are by far the easiest to write as only a single order is fulfilled at once, you can completely forget about concurrency.

def cloud_coffees(order):
    ground_coffee = grind_beans()
    coffee = brew_coffee(ground_coffee)
    coffee.add_embellishments(order)
    return coffee

Each request is fulfilled by a separate instance of the whole application. When a new instance is created there is a little latency involved as it starts up. For this reason an instance may stick around waiting for more requests which will be fulfilled with almost no latency. After a little while if there are no requests it will be reclaimed.

Each request will timeout after a few minutes depending on the implementation. You have to make sure that your tasks complete before this timeout or they will vanish without finishing.

Instances cannot communicate with other instances and should never store any state between requests as the instance could disappear at any point.

The most common implementations of this are AWS Lambda, Azure Functions and Google Cloud Functions.

Cloud function pros

  • Extremely simple model
  • Can be cheaper than running a persistent server
  • Scales with no effort

Cloud functions cons

  • Can have latency issues when starting new instances
  • Requests have timeout limits
  • Less control over Python version - you can only use versions the cloud provider makes available

Which concurrency option should you use?

Let's summarise everything we've talked about into a single table.

||threading|asyncio|multiprocessing|cloud functions| |---|---|---|---|---| |Concurrency Type|Pre-emptive multitasking|Cooperative multitasking|Multiprocessing|Multi-instance| |Concurrent or parallel|Concurrent|Concurrent|Parallel|Parallel| |Explicit control of concurrency|False|True|False|False| |Switching decision|The operating system decides when to switch tasks|The tasks decide when to give up control|The processes run at the same time in different CPU cores|The requests run at the same time in different instances| |Maximum parallel processes|1|1|Number of cores in CPU|Unlimited| |Communication between tasks|Shared state|Shared state|Multiprocessing queues and return values|Impossible| |Thread safe|False|True|True|True| |Tasks to use on|I/O bound|I/O bound|CPU bound|CPU bound if it's less than timeout (~5 minutes)| |Task Overhead|A system thread per task consumes RAM and increases switching time between tasks|As little as possible, all tasks are run in a single process in a single thread|A system process per task consumes more RAM switching time than threading|Starting new instances comes with a latency cost|

Now that you understand all the options is becomes easy to select one.

Before doing this though you should double check that you actually need to speed up your tasks. If it's being run once a week and takes 10 minutes, is there any point in speeding it up?

If it is, then just refer to this flowchart:

Concurrency flowchart

Conclusion

You've now seen examples of the core concurrency options available in Python:

  • threading
  • asyncio
  • multiprocessing

as well as a deployment option that provides a simplified environment for parallel Python:

  • cloud functions

You also know the differences between them, the pros and cons of each, and when to select each option.

I hope you find this useful!

Sourcery

Sourcery helps you write better code faster by understanding your existing code and suggesting refactorings to improve its quality.

It won't help you make architectural decisions like which concurrency option to use, but once you've made that choice it will keep your code simple and clean.

Check it out!

time.sleep()

Not ready to decide?

Book a demo call with our team and we’ll help you understand how Sourcery can benefit your business and engineers.