Concurrent Processing¶
Haiway builds concurrency around asyncio, ctx.spawn(...), and scope-bound task groups. The
helpers in haiway.helpers.concurrent add bounded fan-out and stream-merging patterns on top of
that model without leaving the framework's context and observability rules.
Structured Concurrency Basics¶
Within Haiway, spawned tasks belong to the nearest isolated scope's task group.
ctx.spawn(...)keeps work tied to the current scope.- Scope exit waits for in-scope tasks to settle.
- Child task failures surface through the task group unless explicitly handled.
- State, logging, and observability remain available inside spawned tasks.
If work must outlive the current scope, use ctx.spawn_background(...) instead of ctx.spawn(...).
process_concurrently¶
Use process_concurrently(...) when you need bounded concurrent side effects and do not need
results back.
from haiway import ctx, process_concurrently
async def send_notification(user_id: str) -> None:
client = ctx.state(NotificationClient)
await client.send(user_id, "ready")
await process_concurrently(
["u1", "u2", "u3", "u4"],
send_notification,
concurrent_tasks=3,
)
Behavior:
- Accepts
IterableandAsyncIterable. - Runs at most
concurrent_taskshandlers at once. - Raises the first handler exception by default.
- With
ignore_exceptions=True, logs handler failures and keeps going.
execute_concurrently¶
Use execute_concurrently(...) when you have one async handler and want ordered results.
from haiway import execute_concurrently
async def fetch_user(user_id: str) -> dict[str, object]:
return await api.fetch_user(user_id)
results = await execute_concurrently(
fetch_user,
["u1", "u2", "u3"],
concurrent_tasks=2,
)
Key details:
- Result order matches input order, not completion order.
- Supports both
IterableandAsyncIterable. return_exceptions=Truereturns exception objects in-place instead of raising.
concurrently¶
Use concurrently(...) when the work is already represented as coroutine objects and each coroutine
may have different parameters.
from haiway import concurrently
coroutines = [
fetch_user("u1"),
fetch_account("u1"),
fetch_permissions("u1"),
]
results = await concurrently(
coroutines,
concurrent_tasks=2,
)
This is similar to execute_concurrently(...), but it consumes ready-made coroutines instead of
applying a single handler over elements.
stream_concurrently¶
stream_concurrently(...) merges two async iterables and yields items as soon as either source
produces them.
import asyncio
from haiway import stream_concurrently
async def numbers():
for i in range(3):
await asyncio.sleep(0.1)
yield i
async def letters():
for letter in "ab":
await asyncio.sleep(0.15)
yield letter
async for item in stream_concurrently(numbers(), letters(), exhaustive=True):
print(item)
Important semantics:
- Default
exhaustive=Falsestops the merged stream when either source finishes. exhaustive=Truekeeps yielding until both sources finish.- Yielded order depends on arrival timing.
- Exceptions from either source are propagated.
- Cancelling the consumer cancels the producer tasks created for both sources.
Cancellation and Failure Semantics¶
All four helpers are implemented with local ContextTaskGroups plus ctx.spawn(...).
That gives them predictable behavior:
- Cancellation propagates into spawned work.
- Uncaught task failures stop the operation unless you explicitly request exception-tolerant mode.
- Result-collecting helpers preserve order even when task completion order differs.
Choosing the Right Helper¶
process_concurrently(...): side effects onlyexecute_concurrently(...): apply one handler and collect ordered resultsconcurrently(...): run pre-created coroutines and collect ordered resultsstream_concurrently(...): merge two async iterables into one stream