asyncio Patterns: gather, TaskGroup, Semaphore
asyncio.gather runs awaitables concurrently and returns results in input order. TaskGroup (Python 3.11+) is the structured-concurrency replacement: better cancellation, better error propagation. asyncio.Semaphore caps in-flight concurrency. asyncio.timeout adds a deadline. These four cover most async fan-out patterns.
What it is
asyncio is Python's standard library for cooperative concurrency. One thread, one event loop, many coroutines. The fan-out patterns are different from threads (no preemption, no GIL to worry about, but blocking syscalls freeze the loop), so the primitives are different too.
The four primitives that show up in almost every asyncio program: gather, TaskGroup, Semaphore, timeout. Master these to write almost any async fan-out cleanly.
gather and TaskGroup
asyncio.gather(*aws) is the original way to run awaitables concurrently. It returns a list of results in input order. By default, the first exception cancels gather and propagates; with return_exceptions=True, exceptions are returned in the result list for inspection.
asyncio.TaskGroup (Python 3.11+) is the structured-concurrency replacement. The block scope owns the tasks; when it exits, every task is done or cancelled. If any task raises, the others are cancelled and an ExceptionGroup propagates with all the errors.
For new code, prefer TaskGroup. For older Python, gather. The mental model is the same: spawn N concurrent tasks, wait for all.
Semaphore: the missing concurrency cap
This is the single most common asyncio bug: code uses gather over a list of N items, which spawns N concurrent connections. With N = 10,000 URLs, that opens 10,000 sockets, runs out of file descriptors, and DDoSes the upstream.
The fix: wrap each task in a semaphore acquire. With Semaphore(20), at most 20 are in flight at any moment; the rest wait their turn. asyncio.gather still returns results in input order; the semaphore just slows the spawn.
When this pattern recurs, factor it into a helper:
async def bounded_gather(limit, *aws):
sem = asyncio.Semaphore(limit)
async def run(aw):
async with sem:
return await aw
return await asyncio.gather(*(run(a) for a in aws))
Timeout: enforce deadlines
asyncio.timeout(seconds) is a context manager. Anything inside it must finish within the deadline; if not, every task in the block is cancelled and TimeoutError is raised.
This is the right way to enforce per-request latency budgets. Wrap the request handler body. Wrap the call to the slow downstream. Wrap the TaskGroup of fan-out calls. Cancellation propagates correctly through the structured tasks.
The older asyncio.wait_for(coro, timeout) does the same for a single coroutine. Reach for timeout as the context manager for blocks; wait_for for one-off calls.
as_completed: when first-done-first-served matters
For results in completion order rather than input order, use as_completed. It yields tasks as they finish. Useful for hedged calls (return as soon as any replica responds), for streaming results to the client (send each as it arrives), and for early termination (stop after the first N successes).
Note: as_completed does not pair the original input alongside the result. When that mapping is needed, build it explicitly:
tasks = {asyncio.create_task(fetch(u)): u for u in urls}
for fut in asyncio.as_completed(tasks):
result = await fut
url = tasks[fut] # need this lookup
In Python 3.13+, as_completed was extended to make the input-to-task mapping easier. Until then, build the map explicitly.
What makes asyncio different from threads
No preemption. A coroutine runs until it hits an await. Long synchronous work between awaits freezes everything else. This is the most common asyncio surprise: a time.sleep(1) or a synchronous HTTP call blocks the entire event loop.
The fix: use async equivalents (asyncio.sleep, aiohttp.get). For unavoidable blocking code, push it to a thread with asyncio.to_thread(blocking_func, *args).
Primitives by language
- asyncio.gather(*aws, return_exceptions=False)
- asyncio.TaskGroup (Python 3.11+, structured)
- asyncio.Semaphore (cap concurrency)
- asyncio.timeout / asyncio.wait_for (deadlines)
- asyncio.as_completed (yields tasks as they finish)
Implementation
gather runs the awaitables concurrently and returns results in the same order as inputs. With return_exceptions=False (default), the first failure surfaces immediately; with True, exceptions are returned as values for inspection.
1 import asyncio
2 import aiohttp
3
4 async def fetch(session, url):
5 async with session.get(url) as r:
6 return await r.json()
7
8 async def main():
9 urls = [f"https://api.example.com/items/{i}" for i in range(50)]
10 async with aiohttp.ClientSession() as session:
11 # All 50 in flight concurrently; result list in url order
12 results = await asyncio.gather(*[fetch(session, u) for u in urls])
13
14 # Don't fail the batch on a single error:
15 results = await asyncio.gather(
16 *[fetch(session, u) for u in urls],
17 return_exceptions=True
18 )
19 for url, r in zip(urls, results):
20 if isinstance(r, Exception):
21 print(f"{url} failed: {r}")TaskGroup is the modern replacement for gather. The block waits for all tasks. If any task raises, the others are cancelled and the group raises ExceptionGroup (PEP 654) collecting all the failures. Cleaner error handling, no leaked tasks.
1 import asyncio
2
3 async def fetch_user(uid): ...
4 async def fetch_cart(uid): ...
5 async def fetch_recs(uid): ...
6
7 async def order_view(uid):
8 async with asyncio.TaskGroup() as tg:
9 user = tg.create_task(fetch_user(uid))
10 cart = tg.create_task(fetch_cart(uid))
11 recs = tg.create_task(fetch_recs(uid))
12 # Block exits only when all done; if any raised, ExceptionGroup propagates
13 return {"user": user.result(), "cart": cart.result(), "recs": recs.result()}Without a semaphore, gather over thousands of URLs opens thousands of connections, exhausting file descriptors and overwhelming the upstream service. The semaphore caps in-flight work; surplus tasks wait their turn.
1 import asyncio
2 import aiohttp
3
4 sem = asyncio.Semaphore(20) # 20 in flight max
5
6 async def fetch_limited(session, url):
7 async with sem: # acquires permit
8 async with session.get(url) as r:
9 return await r.json()
10
11 async def main(urls):
12 async with aiohttp.ClientSession() as session:
13 return await asyncio.gather(*[
14 fetch_limited(session, u) for u in urls
15 ])asyncio.timeout(s) cancels everything in the block if it exceeds the deadline. CancelledError propagates out as TimeoutError. Apply at request boundaries to enforce latency budgets.
1 import asyncio
2
3 async def with_budget():
4 try:
5 async with asyncio.timeout(0.5): # 500ms budget
6 async with asyncio.TaskGroup() as tg:
7 tg.create_task(call_a())
8 tg.create_task(call_b())
9 except TimeoutError:
10 # All tasks in the block were cancelled
11 return fallback_response()A bare create_task without keeping a reference can be garbage-collected before it runs. Python may log "Task was destroyed but it is pending". TaskGroup avoids this by holding references internally. When create_task is unavoidable, store the result somewhere.
1 # BAD: task can be garbage collected
2 async def bad():
3 asyncio.create_task(background_work()) # no reference held
4 await asyncio.sleep(0.1) # may not finish
5
6 # GOOD: TaskGroup holds reference and awaits
7 async def good():
8 async with asyncio.TaskGroup() as tg:
9 tg.create_task(background_work())
10
11 # ALSO GOOD: keep your own reference
12 class Manager:
13 def __init__(self):
14 self._tasks = set()
15 def spawn(self, coro):
16 t = asyncio.create_task(coro)
17 self._tasks.add(t)
18 t.add_done_callback(self._tasks.discard)Key points
- •gather runs N awaitables concurrently; result is a list in input order.
- •TaskGroup replaces gather for new code: any task failure cancels siblings, the group raises ExceptionGroup.
- •asyncio.Semaphore is the concurrency limit. Without it, gather over 10,000 URLs spawns 10,000 connections.
- •asyncio.timeout(s) is a context manager. The whole block must finish in s seconds or it cancels.
- •Bare `asyncio.create_task(coro)` without keeping a reference may be garbage-collected mid-flight.
Follow-up questions
▸TaskGroup vs gather: which to use?
▸What is the difference between Task and Future?
▸Why does 'coroutine was never awaited' sometimes appear?
▸How does asyncio interact with threads and processes?
Gotchas
- !create_task without keeping a reference may be GC'd before completion
- !gather with return_exceptions=False raises immediately; siblings keep running
- !Forgetting `await` on an async function returns a coroutine, not a result
- !Calling sync blocking code (time.sleep, requests.get) from async blocks the event loop
- !asyncio.Lock is for asyncio coroutines; threading.Lock will block the event loop