Parallel concepts#
These are general concepts, with some examples primarily in Python.
Note when playing with these examples: errors can get swallowed in threads we
create, so static checks are really useful. The mechanism used to show an error
varies based on several factors. concurrent.futures
rethrows the exception
when you get the result. threading
prints out errors as strings (customizable
in 3.8+, though). Etc.
Quite a few of these work across threading, interpreters, and multiprocessing in Python, and are generally found in other languages too, sometimes with different names.
Thread safety#
Is it safe to use a variable from multiple threads? Generally, read-only variables are fine; if you don’t modify something, there’s no problem. But if you modify a variable, there’s a potential problem. Here’s an example:
from concurrent.futures import ThreadPoolExecutor
x = [0]
def add(num: int) -> None:
for i in range(num):
x[0] += 1
with ThreadPoolExecutor() as pool:
for _ in range(8):
pool.submit(add, 10_000)
print(x[0])
First, I set up a mutable variable, which is simply a list containing an item. I
did this so I can avoid writing the word gllobal
, to show that the problem is
about mutating variables shared between threads, not unique to global variables.
I use +=
to add 1. Remember, x+=1
really does x = x + 1
: the processor
reads the value, then adds one, then writes the value. If another thread reads
the value in the middle of that process, then the final result will be one less
than you expected!
If you run this with traditional Python, you’ll probably get the trivial result: 80,000. That’s due to Python’s context switching; it’s not context switching within this operation. However, if you run it with free-threaded Python, you’ll get the normal multithreading result; some smaller number (around 17,000 for me). (Don’t rely on this in traditional Python either; this is a highly simplified example and we are relying completely on the context switching points helping us out here!).
This variable is not thread safe, which means we can’t safely set it across threads. We’ll look at several ways to make or use thread safe variables.
Mutex#
One of the most general tools for thread safely is a mutex. In Python, it’s
called a Lock
or RLock
, depending on if you can re-enter it in the same
thread. Let’s take a look:
from concurrent.futures import ThreadPoolExecutor
import threading
x = [0]
lock = threading.Lock()
def add(num: int) -> None:
for i in range(num):
with lock:
x[0] += 1
with ThreadPoolExecutor() as pool:
for _ in range(8):
pool.submit(add, 10_000)
print(x[0])
Here, we create an instance of a threading.Lock
. Now we protect the part of
our code that is mutating a variable by taking a lock with our mutex (using the
with block, which releases automatically when exiting the block).
Note this works because the variable itself (Lock instances) are threadsafe, meaning taking a lock is guaranteed to work from any thread.
If you were going for performance, remember, now you have some extra overhead and only one thread at a time can make it through this portion of the code. Free-threaded Python now gives the correct result, but is not really any faster than traditional Python, because it’s basically doing the same thing. If you were doing a lot of work then locking around just a small portion of the code doing some sort of synchronized update, that would be much better.
One common issue is called a “deadlock”; this is when you end up trying to
acquire a lock that is already acquired and won’t be released. Sometimes if it
happens, it’s from the same thread; such as if you have a recursive function
call. Since you usually don’t need the lock from the same thread anyway, RLock
only blocks for a different thread trying to take the lock, solving this issue
at least in that one case.
Semaphore#
This is like a Lock, but instead of having an on/off state, it keeps track of a number with a pre-set limit. For example, you can use a semaphore with a value 4 to create something that never runs more than 4 at a time. This is useful if you have lots of threads, but a resource that you don’t want to hit with all threads at once.
Atomic (not in Python)#
Often, you need to lock because you are updating a single value. Modern processors have specialized instructions to allow a value (like an integer) to be updated in a way that is threadsafe without all the overhead of a lock (and also they don’t need a separate lock object).
Python doesn’t have this, since modifying a Python object far more than a single
integer operation, and it’s just a performance benefit over a mutex. You can use
Event
, which is basically an atomic bool that you can wait on to be set
(True).
Queue#
Another common use case is adding and removing items, usually so one thread can communicate work with another. A common use case will be creating a threadpool of workers, then feeding work to them, with each thread picking up and processing available work. This requires a threadsafe container, and it’s usually optimized for input and output, versus iteration for example.
Python has a Queue
class, which is very powerful first-in, first-out (FIFO)
queue. A trimmed down version, SimpleQueue
, is available, which doesn’t have
the added task-related additions. There’s also last-in, first out (LIFO) and
priority queues, depending on how you want to process tasks.
Here’s an example:
from concurrent.futures import ThreadPoolExecutor
import contextlib
import queue
import time
task_queue = queue.Queue()
result_queue = queue.SimpleQueue()
def worker(q: queue.Queue, r: queue.SimpleQueue) -> None:
with contextlib.suppress(queue.ShutDown):
while True:
task = q.get()
time.sleep(0.1)
r.put(task * 10)
q.task_done()
with ThreadPoolExecutor() as pool:
# Start up 8 workers
for _ in range(8):
pool.submit(worker, task_queue, result_queue)
# Load work for the workers to do
for i in range(50):
task_queue.put(i)
# Wait until task_done() is called the same number of times as items in the queue
task_queue.shutdown()
task_queue.join()
print(sum(result_queue.get() for _ in range(result_queue.qsize())))
Here, we set up two queues. The first one has “jobs”, and we tell the queue when
we’ve completed each task so that it knows when it’s done (for clean shutdowns
of the worker threads, this also uses the shutdown
mechanism introduced in
Python 3.13; you can set clean shutdown yourself if you want to support older
versions).
You might notice, if you look at the API for Queue, that there block
and
timeout
arguments on the get and put methods. You can decide if you want to
wait (block
the current thread) till something is available, and also set a
timeout for how long to wait. Queue’s can also have a maximum size, which is why
these exist for put
as well. And, like locks, you can end up with deadlocks if
you are not careful.
Error checking
This example will swallow errors if you play with it and make a mistake. To fix that, you need to save the returned values from the .submit(...)
’s, and then call .result()
on them; that will reraise the exception in the current thread.
Barrier#
You can set a barrier, which pause threads until all of them reach that point. For example, if you have 4 threads all computing something, you could use a barrier to ensure all threads are done with that computation before you move on to the next step.
Event loop#
One common design for a reactive program is an event loop, where the program is designed around a central loop, and it reacts to input. This is common with async programming, but is also used by things like older GUI toolkits without async, as well. Let’s try creating our own from scratch using generators:
import time
class NotReady(float):
pass
def event_loop(tasks):
while tasks: # Stops when all tasks are done
waits = []
for task in tasks:
try:
res = task.send(None) # async function runs here
if isinstance(res, NotReady):
waits.append(res) # Build up all the requested waits
else:
yield res # Produce result
except StopIteration:
tasks.remove(task) # Task done, remove from tasks
if waits:
time.sleep(min(waits)) # Wait for the shortest requested wait
def sleep(t):
endtime = time.time() + t
while time.time() < endtime:
yield NotReady(endtime - time.time())
yield f"Sleep {t} over"
print(*event_loop([sleep(3), sleep(2), sleep(1), sleep(4)]), sep="\n")
In this example, we start with tasks, and loop over them. Each task returns an
estimate of how long it will take. If you were to use one task within another
task, you would need yield from
for the inner task. The loop waits for the
shortest task to be ready, then tries again. It’s basic, but the idea is there.
We are using the generator system in Python (asyncio was built originally using it), but we could have implemented it with the async special methods instead; it would have been more verbose (since those weren’t really designed to be hand implemented simply), but is quite doable.
Let’s try the same thing with asyncio:
import asyncio
async def sleep(t):
await asyncio.sleep(t)
return f"Sleep {t} over"
async def main():
results = await asyncio.gather(sleep(3), sleep(2), sleep(1), sleep(4))
print(results)
asyncio.run(main())
We don’t sort the output here, but otherwise, it runs about the same, and takes
the same total amount of time. The difference here is we using a pre-existing
awaitable (sleep), so we have to use await
, which is really just yield from
but using the async-named special methods.