Weeks 9 - 10: Parallel processing in Python
Python offers support for two common models for concurrent computation:
- Threading: Programs running in parallel in a shared Python environment.
- Multiprocessing: Programs running in parallel in separate Python environments.
Threading and multiprocessing allow us to "distribute" the work of a program across multiple threads and/or processes by splitting it into a collection of subtasks that can be executed in parallel. Some examples follow:
-
Parallel array sum: suppose we want to compute the sum of the elements of a very large array. We can "split" they array into non-overlapping blocks and have a separate thread compute the partial sum of each block. Finally, we output the sum of the partial sums.
-
Parallel matrix-vector multiplication: to compute the product \( x \mapsto Ax \) efficiently, we can parallelize it by computing \( A_{i,:} x \) (where \( A_{i,:} \) is the \( i \)-th row of \( A \)) concurrently in separate threads or processes.
Threading
We begin our discussion on threads with a somewhat contrived example: we wish to increment a counter \( N \) 5 times, and perform each increment in a separate thread.
import threading
import time
N = 0
def worker():
global N
print(threading.currentThread().getName(), 'Starting', N)
M = N
print(M)
N = M+1
time.sleep(1)
print(threading.currentThread().getName(), 'Exiting', N)
for i in range(5):
w = threading.Thread(target=worker)
w.start()
The above example demonstrates the standard work flow of the threading module:
- A "master" process spawns a number of threads, using the
threading.Thread
object and itstarget
parameter to specify what each thread should do. - Threads spawned by the same process have shared memory, and can access and modify the same
resources; in the above example, they all try to increment the same variable
N
. - The master process does not "pause" while the threads are active.
- Important: threads typically execute out-of-order. For example, the first thread created need not be the first one to complete its work.
The second point above implies that threading code can lead to race conditions!
- Multiple threads try to access and modify the same resource concurrently.
- Special care needed to ensure output is deterministic.
Race conditions
To better demonstrate how a race condition can affect the output of our program, consider the simplest case where we only have two threads and let us focus on the following part of our code:
def worker():
global N
M = N
N = M + 1
Here, the order of execution matters! For example, consider the following sequence of events:
- Thread 1 reaches the
M = N
statement; creates a local variableM = 0
, sinceN = 0
initially. - Thread 2 reaches the
M = N
statement, does the same thing as Thread 1. Since Python variables are local by default, thisM
is different from theM
that Thread 1 sees. - Thread 1 executes
N = M + 1
, settingN = 1
. - Thread 2 executes
N = M + 1
. This also setsN = 1
.
After both threads terminate, we have N = 1
instead of N = 2
, which would be the "expected" behavior.
Synchronization
To prevent race conditions, we have a number of so-called synchronization mechanisms at our disposal. The simplest one is called a lock! At a high-level, a lock is an object that can be "held" by a single thread at a time, so we can use them to designate "safe" regions just like below:
from threading import Thread, Lock
N = 0
lock = Lock()
def worker():
global N
lock.acquire() # enter safe region
M = N
N = M + 1
lock.release() # leave safe region
In the above code, whenever a thread enters the safe region, it tries to acquire the lock. If it cannot, it means that another thread is currently in the safe region and it must wait until the lock is released, preventing any unwanted changes. The lock is released upon exiting the safe region, enabling other threads to enter it.
Locks as context managers
Enclosing critical regions between lock.acquire()
and lock.release()
statements is somewhat cumbersome and prone to human error (e.g., one could
forget to add the .release()
statement). Luckily, we can
use locks as context managers
using Python's with
statement, just like below:
with lock:
do_something()
The above snippet of code is equivalent to writing
lock.acquire()
do_something()
lock.release()
For that reason, we will use the context manager syntax for locks in our examples from now on.
Waiting for threads to stop
In addition to the Thread.start()
method, upon which a thread begins to run, we have the
Thread.join()
method, which waits until a thread has finished. For example:
from threading import Thread, Lock
N = 0
lock = Lock()
def worker():
global N
lock.acquire() # enter safe region
M = N
N = M + 1
lock.release() # leave safe region
threads = []
for i in range(5):
t = Thread(target=worker)
t.start()
threads.append(t)
# wait until they finish
for t in threads:
t.join()
print(f"Value of N is: {N}")
Note that we have to keep track of the threads created in a list in order to be able to wait for them to finish in the future.
Question
What would happen if we rewrote the final few lines of the above program as below?
for i in range(5):
t = Thread(target=worker)
t.start()
t.join()
Answer
Since the join()
function waits / "blocks" until a thread has finished its execution,
the above code would be equivalent to having no paralellization at all.
Passing arguments to threads
So far we have used the target
parameter to designate the code to be executed by each
thread, but the target functions did not accept any parameters. When the target function
expects a set of arguments, we can pass them using the args
parameter of the Thread
object as shown below:
from threading import Thread
def worker(arg1, arg2):
print(arg1 + arg2)
for i in range(5):
t = Thread(target=worker, args=(i,i+1))
t.start()
A more realistic example of passing arguments will follow later, as we try to speed up a sorting algorithm using threads.
Other synchronization mechanisms
Locks are not the only options for synchronization between threads. In threading
we have
access to several synchronization primitives:
-
Mutex: a fancy name for a lock; designates a "safe" region that only one thread can enter at a time.
-
Semaphore: a "counting" version of the lock that allows multiple threads to enter a region. A semaphore can be treated as a counter that works as follows:
- before a thread enters a safe region, it tries to decrement the value of the semaphore by 1.
- when a thread leaves a safe region, it increments the value of the semaphore by 1.
-
the value of the semaphore cannot drop below
0
; if a semaphore has value0
and a thread tries to decrement it, it will "block" until it is able to do so without violating this requirement. -
Barrier: A barrier indicates that a thread must block until all other threads spawned by the same process reach the barrier.
Note that the above synchronization mechanisms are not thread-specific and are also available and necessary in a multiprocessing environment.
The producer-consumer problem
In the producer-consumer problem, we have the following setup: one or more threads generate data, and a disjoint collection of threads "consume" them and process them. The order at which data are processed must satisfy a FIFO order; the first piece of data generated must be the first to get processed, and so on. In particular, we can assume that we have a FIFO data structure such as a queue shared between the different threads / processes.
For the producer-consumer framework with a single producer, a semaphore is necessary for synchronization so as to prevent any threads from attempting to consume data when none are available. This would not be possible with a lock (at least not with additional modifications):
from collections import deque
import threading
queue = deque()
sem = threading.Semaphore(value=0)
def producer():
global queue
while True:
new_data = generate_data()
queue.insert(new_data)
sem.release() # increment semaphore value by 1
def consumer():
global queue
sem.acquire() # attempt to decrement value by 1
consume(queue.popleft()) # if successful, get next item from queue
Combining synchronization mechanisms
In the above example, the following two operations will likely be executed concurrently:
# CONSUMERS
queue.popleft()
# PRODUCER
queue.insert()
Depending on the implementation of the queue
, the above operations may not be
thread-safe, i.e., running them concurrently might produce inconsistent results
or even mess up the internals of the queue
data structure. To guard against this,
we could introduce an additional lock in our code:
queue = deque()
sem = threading.Semaphore(value=0)
lock = threading.Lock()
def producer():
global queue
while True:
new_data = generate_data()
with lock:
queue.insert(new_data)
sem.release()
def consumer():
global queue
sem.acquire()
with lock:
consume(queue.popleft())
In practice, many Python libraries offer thread-safe implementations for us.
A more involved example: multi-threaded sorting
Suppose we have \( T \) available threads and want to sort an array of numbers.
In addition, suppose we have an implementation of sort(A)
that sorts an array
in-place with running time \( O(n \log n) \), where \( n \) is the size of the
input array.
In what follows, we will parallelize our sorting algorithm so that its running time is
For \(T = \Theta(\log n)\), the above yields an asymptotically faster running time.
The idea is straightforward: we will split our input array \( A \) into disjoint blocks \( A_1, \dots, A_T \), each of size \( \frac{n}{T} \) (assuming \( n \) is divisible by \( T \) for simplicity). Then:
-
We will call
sort
on each block \( A_i \) in a separate thread. Since sorting is done in place, this will result in a collection of \( T \) sorted arrays. -
We will use an \( O(n \log T) \) algorithm for merging \( T \) sorted arrays in the parent process.
Even though we are using threading, the above algorithm has no race conditions, since each thread handles a disjoint block of the input array \( A \). The complete algorithm follows below:
from math import ceil
from threading import Thread
from heapq import merge
def sort(A, start, end):
A[start:end] = sorted(A[start:end])
def merge(array_of_arrays):
return list(heapq.merge(*arrays))
def sort_parallel(A, T):
bsize = ceil(len(A) / T)
threads = []
for i in range(T):
start, end = bsize * i, min(bsize * (i + 1), len(A))
t = Thread(target=sort, args=(A, start, end))
t.start()
threads.append(t)
# wait until all threads have finished
for t in threads: t.join()
return merge([
A[i*bsize:min((i+1)*bsize, len(A))] for i in range(T)])
More questions
Question
In the above examples, we used global N
to edit the variable N
in
different threads. However, parallel_sort
passes the array A
as argument.
Can we do the same with N (i.e., can we change the above code snippet to look like below)?
from threading import Lock, Thread
def worker(N):
with lock:
M = N
N = M + 1
N = 0
threads = []
for i in range(5):
t = Thread(target=worker, args=(N,))
t.start()
threads.append(t)
Answer
The answer is no, since variables that hold numbers are passed by value
in Python, and the proposed approach will just result in each thread
creating a local copy of N
(in other words, this would not work even
if we didn't have threads).
On the other hand, arrays and other objects are passed by reference, which
is why parallel_sort
worked as is.
Python and threading
Threading in Python is limited and not really intended for CPU-intensive tasks. The reason is by design; Python has something called the Global Interpreter Lock, which means that bytecode running in a single Python environment (as is the case with threading) cannot run in parallel (it can still run out of order, but not concurrently).
The threading
library is still very useful for IO-bound tasks (e.g.,
downloading several files in parallel). However, for CPU-bound tasks, the
multiprocessing
library presented below is the recommended approach to
achieve parallelism.
Multiprocessing
Multiprocessing in Python is possible via the homonymous library, which allows a
"master" or base process to spawn several other Python processes, each of them
effectively running in a separate Python environment. For reference, let us see
how we would rewrite the parallel increment example from the previous section
in multiprocessing
:
import multiprocessing as mp
def worker(N, lock):
with lock:
N.value += 1 # modify using the `value` field
procs = []
N = mp.Value('i', 0) # 'i': integer type
lock = mp.Lock()
# create 5 processes
for i in range(5):
p = mp.Process(target=worker, args=(N, lock))
p.start()
procs.append(p)
# wait for all processes to stop
for p in procs: p.join()
Key differences:
-
memory is no longer shared by default, so we can't use
global N
as we did before -
instead, we use "special" constructs for sharing objects b/w processes:
-
Value
for a shared scalar value -
Array
for a shared array -
Queue
for a shared queue that multiple processes can read/write to
-
-
synchronization mechanisms are still necessary when editing shared memory
While synchronization mechanisms from the multiprocessing
library are similar
(e.g., Lock
, Semaphore
, etc.), they have to be passed as arguments to the
worker
function explicitly.
Multiprocessing examples
Populating an array in parallel
Goal: suppose we have nproc
processes available, and wish to generate a random array with elements in the range [0, 1000]
and length N
in parallel.
The idea is to let each process populate a separate "block" of the array. To do so,
we will create a shared memory array (multiprocessing.Array
). Here are some
examples of defining such a shared array:
from multiprocessing import Array
N = 100
integer_array = Array('i', N)
double_array = Array('d', N)
Note
Shared memory arrays such as the above are statically typed (i.e., their type must be known in advance) since they get mapped to low-level structures to allow sharing between processes.
The above arrays can be indexed just like Python lists
. In addition, all their elements are initially set to 0
. With this at hand, we can write our program:
from math import ceil
from multiprocessing import Process, Array
def worker(A, start, end):
A[start:end] = np.random.randint(0, 1000, size=(end-start,))
if __name__ == "__main__":
array = Array('i', 250) # example: N = 250
nproc = 16
bsize = ceil(len(array) / nproc)
procs = [None] * nproc
for i in range(nproc):
# split array into chunks
start, end = i * bsize, min((i+1) * bsize, len(array))
p = Process(target=worker, args=(array, start, end))
p.start()
procs[i] = p
# wait until they all end
for p in procs: p.join()
The above snippet uses bsize
to determine the size of the blocks of array
handled by each process. Every different block is populated using calls to
np.random.randint
with appropriate size.
Common gotchas
Some important things to keep in mind when writing multiprocessing
code:
-
Shared memory objects still need to be passed to the
worker
function as parameters. -
Make sure to use the
if __name__ == "__main__" ...
construct to ensure that only the first process executes the "main" part of the code!
Tip
In Python 3, ceil()
returns an integer; in Python 2, it would return a
float
. If you are using Python 2 (which is not recommended), you should
cast it to an integer:
bsize = int(ceil(len(array) / nproc))
Parallel array sum
Goal: We want to compute the sum of an array in parallel.
As in the previous example, we will split the array in separate blocks or "chunks" and assign each chunk to a different process. Each process will compute the partial sum of its assigned chunk, and the parent process will sum over the partial sums.
However, there is a nontrivial obstacle to overcome. How will the children processes communicate their results back to the parent process?
- We could add a
return
statement at the end of theworker
function, but that would not help. - Instead, we will use a
multiprocessing.Queue
to communicate return values.
The Queue
class
A multiprocessing.Queue
is a race-condition-free implementation of a
FIFO queue, and can be used to communicate values back and forth between
multiple processes. No synchronization mechanisms are necessary when using
its get
or put
methods.
The program follows below:
import numpy as np
from multiprocessing import Process, Queue
def worker(A, start, end, queue):
queue.put(sum(A[start:end]))
A = np.random.randint(0, 1000, size=(250,))
nproc = 16
bsize = (len(A) // nproc) + 1
procs = [None] * nproc
queue = Queue()
for i in range(nproc):
# figure out subarray limits
start, end = i * bsize, min((i + 1) * bsize, len(A))
# start a process
p = Process(target=worker, args=(A, start, end, queue))
p.start()
procs[i] = p
Asum = sum([queue.get() for i in range(nproc)])
for p in procs: p.join()
Getting elements from the Queue
Note the unintuitive order in which we are get()
-ing the return values above:
Asum = sum([queue.get() for i in range(nproc)])
for p in procs: p.join()
One would expect that we would first join()
the processes and get the
return values from the Queue
later. However, the correct pattern is the
reverse, which we use here.
An explanation of why this is the correct pattern can be found in this StackOverflow thread.
Question
Why not use a multiprocessing.Array
for the array A
?
Answer
Since we are not changing the values of A
but rather only summing over them,
there is no need for A
to be shared; it is sufficient for each process to
receive its own copy of it.
Number of processes
To get the best out of parallelism, it is important to use the correct number of processes. The "ideal" situation is when all processes can run completely in parallel by assigning exactly one to each processor.
To determine this dynamically, you can use the os
library:
from os import cpu_count
nprocs = cpu_count()
# parallel code using `nprocs` processes in total
The dining philosophers problem
So far, our examples either had no race conditions or suffered from a simple race condition: many threads or processes are trying to edit a value in shared memory at the same time, causing unpredictable results. However, there are (more rare) instances of race conditions that lead to deadlocks.
Dining philosophers problem: Suppose that 5 philosophers are sitting on a circular table with just 5 chopsticks, and the following hold:
-
philosophers alternate between thinking and eating
-
1 chopstick is placed between each pair of adjacent philosophers
-
to eat, a philosopher must pick up both their right and left chopsticks
Note
The above is just a pedagogical instantiation of a more abstract model:
-
each philosopher is a process
-
"thinking" means that the process is doing some CPU-bound task
-
"eating" is some resource-bound task
-
"chopsticks" are resources shared between multiple processes
The example is due to Edsger Dijkstra (1965).
The dining philosophers problem presents a twofold set of challanges:
-
Since philosophers are competing for chopsticks at the same time, we need to introduce synchronization mechanisms to ensure consistent results (i.e., two adjacent philosophers should not be eating at the same time).
-
Even with synchronization, the system can reach a state where no progress can be made! This is a so-called deadlock.
Deadlock
Consider the following proposed solution to the dining philosophers problem:
- A philosopher will
think()
until left chopstick becomes available - When the left chopstick becomes available, they pick it up and wait for right chopstick
- When the right chopstick becomes available, they pick it up and
eat()
- Put the right chopstick down and then the left chopstick down (order does not matter)
- Repeat from the beginning
Problem: if all philosophers pick up their left chopstick simultaneously, no progress can be made! Philosopher 1 is waiting for the chopstick on their right, which is held by Philosopher 2. Philosopher 2 is waiting for a chopstick held by Philosopher 3, and so on.
To avoid the resulting deadlock, E. Dijkstra proposed imposing an ordering on the chopsticks
-
Let \(i \in \{0, \dots, 4\}\) be the index of the philosophers (e.g., clockwise).
-
The chopstick on the left of philosopher \(i\) is given index \(i\), and the one on the right is given index \((i + 1) \; \mathrm{mod} \; 5\).
-
Waiting is done in order. In particular, philosopher \(i\):
- waits for chopstick \(i\) first if \(i < (i + 1) \; \mathrm{mod} \; 5\)
- otherwise, waits for the other chopstick first
Why is this helpful? It is immediate that the previous scenario that was causing the deadlock can no longer happen. In particular, if 4 philosophers pick the chopsticks on their left side at the same time, the 5th one will not be able to pick up either!
A solution using threading
follows below (the multiprocessing
version of this will be similar, subject to some bookkeeping):
from random import randint
from threading import Thread, Lock
chopsticks = [Lock() for _ in range(5)] # 5 chopsticks
def eat(i):
first, last = min(i, (i+1) % 5), max(i, (i+1) % 5)
while True:
print(("Philosopher %d waits for first chopstick" % i))
with chopsticks[first]:
print(("Philosopher %d waits for second chopstick" % i))
with chopsticks[last]:
sleep(randint(0, 2)) # "EAT" for some time
threads = [Thread(target=eat, args=(i,)) for i in range(5)]
for t in threads: t.start()
for t in threads: t.join()
Note
The dining philosophers problem is simple to resolve due to the dependence structure obeyed by the processes and the shared resources. However, modern operating systems have to solve similar problems in vastly different scales (with hundreds or thousands of shared resources).
In those cases, "hand-made" solutions may be inadequate or simply too tedious to derive, calling for advanced synchronization mechanisms. Monitors are an example of such a mechanism.
Process Pools
Finally, multiprocessing
introduces an object called the process Pool
, which automates certain parts of process management (e.g., starting and stopping, as well
as obtaining return values, etc.).
Process pools offer several methods - some commonly used ones are summarized below:
-
Pool.map(f, iterable)
: apply the functionf
to each element initerable
-
Pool.apply(f, args)
: call the functionf
with argumentsargs
: -
Pool.starmap(f, iterable)
: this is similar tomap
, but with a subtle difference; suppose thatf
has the following signature:def f(arg1, arg2): return arg1 + arg2
Running
ProcessPool.map(f, [(1, 2), (3, 4), (5, 6)])
will fail, because it will try to compute:[ f((1,2)), f((3, 4)), f((5, 6)) ]
In other words,
f
will expect two arguments but only receive a singletuple
as an argument. Instead,starmap
will compute[ f(*(1, 2)), f(*(3, 4)), f(*(5, 6)) ]
which will apply, e.g.,
f(1, 2)
as expected.
Tip
All of the above methods are blocking, which means that the parent process
stops or "blocks" until the subprocesses complete their assigned tasks. The multiprocessing
library offers non-blocking versions, which can be distinguished by the _async()
suffix in their name.
The official documentation has more details on how these work in practice.
Process Pool examples
Example 1: Parallel matrix sum
We wish to compute the sum over all the elements of a 2D matrix A
. One way to do so is to make each process compute the sum over a separate row of A
, and add these partial sums up in the parent process.
They key to write this using a ProcessPool
is to recall that a 2D numpy.array
object is iterated row-by-row by default:
import numpy as np
A = np.array([
[1, 2, 3],
[4, 5, 6],
[7, 8, 9]
]
for row in A:
print(row)
# will print:
# [1, 2, 3]
# [4, 5, 6]
# [7, 8, 9]
Given this, we can use the Pool.map
method to accomplish our task:
import os
import numpy as np
from multiprocessing import Pool
nproc = os.cpu_count()
# create a random array
A = np.random.randint(0, 1000, size=(1000, 100))
with Pool(processes=nproc) as pool:
# note: chunksize is optional
parsum = sum(pool.map(sum, A, chunksize=(np.shape(A)[0] // nproc)))
print(parsum, np.sum(A)) # the two sums will be the same
Example 2: Parallel merging of sorted lists
Recall the following task: given sorted lists \(L_1, \dots, L_K\), we wish to merge them into a big list containing all the elements of \(L_1, \dots, L_K\) in sorted order.
The natural divide-and-conquer algorithm to accomplish this in time \(O(N \log K)\), where \(N\) is the total number of elements, is the following:
- Write a function
single_merge(A, B)
that merges sortedA
andB
in time \(O(\texttt{len}(A) + \texttt{len}(B))\). - Pair the lists up: form \((L_1, L_2), (L_3, L_4), \dots, (L_{K-1}, L_K)\)
- Merge each pair using
simple_merge
. - Repeat steps 1-3 until a single list remains.
Note: in Step 2, we can always assume that \(K\) is a multiple of \(2\) by adding the empty list into our collection without altering the result, if necessary.
Now, we explore how to parallelize this algorithm.
Step 1: if Ls
is a list of sorted lists with K
lists in total, we can use
zip(Ls[:(K // 2)], Ls[(K // 2):])
to create an iterable over pairs \((L_1, L_{K/2+1}), \dots, (L_{i}, L_{K/2 + i})\). Since zip
"loses" elements if its two arguments don't have the same length, we pad an empty list to make sure K
is even, if necessary.
Step 2: We now apply the single_merge
function to each pair in the result of zip
using Pool.starmap
. The reason we have to use starmap
was explained at the beginning of our discussion: single_merge
expects two arguments, yet the output of zip
is a list of tuples.
Step 3: Repeat Steps 1-2 until Step 2 results in a single list. The total number of times we will have to do this is \(O(\log_2 K)\) times, since each time we are reducing the number of sorted lists to merge by half.
The Python code below contains the full example:
from os import cpu_count
from heapq import merge
from multiprocessing import Pool
def single_merge(list_a, list_b):
return list(merge(list_a, list_b))
def merge(Ls):
while True:
k = len(Ls)
if k == 1: # eventually, we will end up in this case
return Ls[0]
if k % 2 == 1:
Ls.append([]) # make sure number of lists divisible by 2
k += 1
# parallelize merging step
with Pool(processes=cpu_count()) as pool:
Ls = pool.starmap(single_merge, zip(Ls[(k // 2):], Ls[:(k // 2)]))