Concurrent Processing¶
Haiway provides powerful tools for concurrent and parallel processing within its functional
programming paradigm. Built on Python's asyncio, the framework offers structured concurrency
through the context system (ctx.spawn) and advanced patterns through helper utilities. This guide
explains how to effectively use these tools to build high-performance concurrent applications.
Core Concepts¶
Structured Concurrency¶
Haiway follows structured concurrency where tasks are managed through scope task groups:
- Tasks spawned within a scope run in its task group; they keep running until they finish or are cancelled.
- Exceptions in child tasks can propagate to parent scopes.
- Resources are properly cleaned up through the scope lifecycle.
- Task isolation ensures independent execution contexts.
Context Propagation¶
When spawning tasks, Haiway preserves the execution context:
- State objects remain accessible via
ctx.state() - Logging maintains proper scope identification
- Observability tracking continues across task boundaries
- Variables are isolated per task (not inherited from parent)
Basic Task Spawning¶
The fundamental building block for concurrency in Haiway is ctx.spawn(), which creates tasks
within the current scope's task group:
from haiway import ctx
import asyncio
async def process_item(item_id: str) -> dict:
# Access context state within spawned task
config = ctx.state(ProcessingConfig)
ctx.log_info(f"Processing item {item_id}")
# Simulate async work
await asyncio.sleep(1)
return {"id": item_id, "status": "completed"}
async def main():
async with ctx.scope("processor"):
# Spawn multiple tasks
task1 = ctx.spawn(process_item, "item-1")
task2 = ctx.spawn(process_item, "item-2")
task3 = ctx.spawn(process_item, "item-3")
# Tasks run concurrently
results = await asyncio.gather(task1, task2, task3)
ctx.log_info(f"Processed {len(results)} items")
Fire-and-Forget Tasks¶
Tasks spawned with ctx.spawn() don't need to be awaited explicitly:
async def background_task():
while True:
ctx.check_cancellation() # Cooperative cancellation
await process_queue_item()
await asyncio.sleep(1)
async def main():
async with ctx.scope("app"):
# Spawn background task
ctx.spawn(background_task)
# Do other work
await handle_requests()
# Background task is cancelled only if you cancel it or the task group raises;
# otherwise it runs to completion before scope exit
Task Cancellation¶
Tasks support cooperative cancellation through the context API:
async def long_running_task():
for i in range(100):
ctx.check_cancellation() # Raises CancelledError if cancelled
await process_batch(i)
async def main():
async with ctx.scope("batch"):
task = ctx.spawn(long_running_task)
# Cancel after timeout
await asyncio.sleep(10)
ctx.cancel() # Cancels current task
# or
task.cancel() # Cancel specific task
Streaming with Context¶
The ctx.stream() method enables async generators to run in proper context:
async def generate_items() -> AsyncGenerator[str, None]:
for i in range(10):
yield f"item-{i}"
await asyncio.sleep(0.1)
async def main():
async with ctx.scope("streamer"):
# Stream runs in its own context
async for item in ctx.stream(generate_items):
ctx.log_info(f"Received: {item}")
Concurrent Processing Helpers¶
The haiway.helpers.concurrent module provides four specialized functions for common concurrent
processing patterns. All functions integrate with Haiway's context system and provide controlled
parallelism.
process_concurrently¶
Process elements from an iterable without collecting results. Ideal for side-effect operations like notifications, logging, or data transformations.
from haiway.helpers.concurrent import process_concurrently
async def send_notification(user_id: str) -> None:
client = ctx.state(NotificationClient)
await client.send(user_id, "Your order is ready!")
async def notify_users():
user_ids = ["user-1", "user-2", "user-3", "user-4", "user-5"]
# Process with concurrency limit
await process_concurrently(
user_ids,
send_notification,
concurrent_tasks=3, # Max 3 concurrent notifications
ignore_exceptions=True # Continue on individual failures
)
Parameters:
source: AsyncIterable[Element] | Iterable[Element]- Elements to processhandler: Callable[[Element], Coroutine[Any, Any, None]]- Processing functionconcurrent_tasks: int = 2- Maximum concurrent tasksignore_exceptions: bool = False- If True, log exceptions but continue processing
Key Features:
- Processes elements as they become available
- Maintains the specified concurrency limit
- Automatic task cancellation on errors or cancellation
- Optional exception tolerance for resilient processing
execute_concurrently¶
Execute handler for each element and collect results in order. Perfect when you need to process collections and gather the outcomes.
from haiway.helpers.concurrent import execute_concurrently
async def fetch_user_data(user_id: str) -> dict:
client = ctx.state(APIClient)
return await client.get(f"/users/{user_id}")
async def fetch_all_users():
user_ids = ["user-1", "user-2", "user-3"]
# Execute concurrently and collect results
results = await execute_concurrently(
fetch_user_data,
user_ids,
concurrent_tasks=5
)
# Results maintain order: results[0] is for user_ids[0]
for user_id, data in zip(user_ids, results):
ctx.log_info(f"User {user_id}: {data}")
Parameters:
handler: Callable[[Element], Coroutine[Any, Any, Result]]- Processing function that returns resultselements: AsyncIterable[Element] | Iterable[Element]- Elements to processconcurrent_tasks: int = 2- Maximum concurrent tasksreturn_exceptions: bool = False- Include exceptions in results instead of raising
Key Features:
- Results returned in same order as input elements
- Configurable exception handling via
return_exceptions - Works with both sync and async iterables
- Preserves result ordering for predictable processing
concurrently¶
Execute pre-created coroutine objects with controlled parallelism. More flexible than
execute_concurrently when coroutines need different parameters or come from different sources.
from haiway.helpers.concurrent import concurrently
async def fetch_with_timeout(url: str, timeout: float) -> dict:
return await asyncio.wait_for(http_client.get(url), timeout)
async def fetch_different_endpoints():
# Create coroutines with different parameters
coroutines = [
fetch_with_timeout("https://api.example.com/fast", 3.0),
fetch_with_timeout("https://api.example.com/slow", 10.0),
fetch_with_timeout("https://api.example.com/medium", 5.0),
]
results = await concurrently(
coroutines,
concurrent_tasks=2
)
# Results maintain order: results[0] from first coroutine, etc.
return results
Parameters:
coroutines: AsyncIterable[Coroutine] | Iterable[Coroutine]- Coroutine objects to executeconcurrent_tasks: int = 2- Maximum concurrent tasksreturn_exceptions: bool = False- Include exceptions in results instead of raising
Key Features:
- Works directly with coroutine objects rather than applying a handler function
- Allows for different parameters per coroutine
- Maintains result ordering matching input coroutine order
- Flexible source of coroutines from different functions or generators
stream_concurrently¶
Merge two async iterators into a single stream, yielding elements as they become available from either source.
from haiway.helpers.concurrent import stream_concurrently
async def sensor_readings() -> AsyncIterator[float]:
while True:
await asyncio.sleep(0.1)
yield random.uniform(20.0, 25.0)
async def status_updates() -> AsyncIterator[str]:
while True:
await asyncio.sleep(0.5)
yield "System OK"
async def process_events():
# Merge streams - yields events as they arrive from either source
async for item in stream_concurrently(
sensor_readings(),
status_updates()
):
if isinstance(item, float):
ctx.log_info(f"Temperature: {item}°C")
else:
ctx.log_info(f"Status: {item}")
Parameters:
source_a: AsyncIterable[ElementA]- First async iterable to consumesource_b: AsyncIterable[ElementB]- Second async iterable to consumeexhaustive: bool = False- If True, continue until both sources complete; if False (default), stop when either exhausts
Key Features:
- Elements yielded based on availability, not source order
- Maintains exactly one pending task per iterator for efficiency
- Default behavior stops when either source is exhausted
- Exhaustive mode continues until both sources complete
AsyncStream¶
Push-based async stream with back-pressure, suitable for coordinating producers and a single consumer.
from haiway import AsyncStream, ctx
async def example_stream_usage() -> list[int]:
stream: AsyncStream[int] = AsyncStream()
results: list[int] = []
async def producer() -> None:
for i in range(5):
await stream.send(i) # waits until consumer is ready
stream.finish() # signal completion (or use stream.cancel())
# Start producer in the current context
ctx.spawn(producer)
# Single consumer iterates values as they arrive
async for value in stream:
results.append(value)
return results # [0, 1, 2, 3, 4]
Behavior:
- Single-consumer: one active iteration is allowed; reuse raises an assertion.
- Back-pressure:
send()suspends until the consumer accepts the element. - Completion:
finish()ends the stream;finish(exc)raisesexcon the consumer. - Cancellation:
cancel()is equivalent tofinish(CancelledError()). - Post-finish sends:
send()to a finished/failed stream is ignored.
When to use:
- Coordinating multiple producers that should not outpace the consumer.
- Bridging event callbacks into an async-iterable interface.
- As a building block for higher-level streaming utilities like
stream_concurrently().
Error Handling Patterns¶
Exception Tolerance in Processing¶
Handle exceptions gracefully in batch operations:
async def resilient_batch_processing():
urls = ["http://api1.com", "http://invalid", "http://api2.com"]
# Collect exceptions as results
results = await execute_concurrently(
fetch_data,
urls,
concurrent_tasks=10,
return_exceptions=True
)
for url, result in zip(urls, results):
if isinstance(result, BaseException):
ctx.log_error(f"Failed to fetch {url}", exception=result)
else:
ctx.log_info(f"Success: {url}")
# Similar pattern for concurrently()
async def handle_mixed_coroutines():
results = await concurrently(
[risky_operation(), safe_operation(), another_risky_operation()],
concurrent_tasks=3,
return_exceptions=True
)
successes = [r for r in results if not isinstance(r, BaseException)]
failures = [r for r in results if isinstance(r, BaseException)]
ctx.log_info(f"Processed: {len(successes)} successes, {len(failures)} failures")
Advanced Patterns¶
Chunked Processing¶
Process large datasets in chunks:
async def process_large_dataset(items: list[str]):
chunk_size = 100
for i in range(0, len(items), chunk_size):
chunk = items[i:i + chunk_size]
# Process chunk concurrently
await process_concurrently(
chunk,
process_item,
concurrent_tasks=10
)
# Optional: Add delay between chunks
await asyncio.sleep(1)
Dynamic Concurrency¶
Adjust concurrency based on system load:
async def adaptive_processing():
items = await get_items()
# Determine concurrency based on system resources
cpu_count = os.cpu_count() or 1
memory_available = get_available_memory_gb()
# Scale concurrency with available resources
concurrent_tasks = min(
cpu_count * 2, # 2x CPU cores
int(memory_available / 0.5), # 500MB per task
50 # Hard limit
)
ctx.log_info(f"Processing with {concurrent_tasks} concurrent tasks")
await process_concurrently(
items,
heavy_processing,
concurrent_tasks=concurrent_tasks
)
Best Practices¶
1. Choose the Right Tool¶
ctx.spawn(): For fire-and-forget tasks or when you need task handlesprocess_concurrently(): For side-effect operations without collecting resultsexecute_concurrently(): For applying a handler function to elements and collecting resultsconcurrently(): For executing pre-created coroutine objects with different parametersstream_concurrently(): For merging two async iterators into a single stream
2. Resource Management¶
Always consider resource limits:
async def resource_aware_processing():
# Limit based on external resources
db_pool_size = ctx.state(DatabaseConfig).pool_size
concurrent_tasks = min(db_pool_size // 2, 20)
await process_concurrently(
items,
database_operation,
concurrent_tasks=concurrent_tasks
)
3. Error Boundaries¶
Isolate failures with proper error handling:
async def resilient_processing():
async def safe_process(item: str) -> Result | None:
try:
return await risky_operation(item)
except Exception as e:
ctx.log_error(f"Failed processing {item}", exception=e)
return None
results = await execute_concurrently(
safe_process,
items,
concurrent_tasks=10
)
# Filter out failures
successful = [r for r in results if r is not None]
4. Monitoring and Observability¶
Track concurrent operations:
async def monitored_processing():
start_time = time.time()
ctx.record_info(
event="batch_processing_started",
attributes={"item_count": len(items)}
)
try:
results = await execute_concurrently(
process_item,
items,
concurrent_tasks=20
)
duration = time.time() - start_time
ctx.record_info(
metric="batch_processing_duration",
value=duration,
kind=ObservabilityMetricKind.HISTOGRAM,
attributes={"status": "success"}
)
except Exception as e:
ctx.record_info(
event="batch_processing_failed",
attributes={"error": str(e)}
)
raise e
5. Context Isolation¶
Remember that spawned tasks have isolated variable contexts:
async def context_isolation_example():
# Set variable in parent
ctx.variable(RequestID("parent-123"))
async def child_task():
# Variable is NOT inherited
request_id = ctx.variable(RequestID) # None
# Set new variable in child
ctx.variable(RequestID("child-456"))
await ctx.spawn(child_task)
# Parent variable unchanged
assert ctx.variable(RequestID).value == "parent-123"
Performance Considerations¶
Concurrency vs Parallelism¶
Haiway's concurrency is based on asyncio, which provides:
- Concurrency: Multiple tasks make progress by interleaving execution
- Not true parallelism: Single thread, so CPU-bound tasks don't run in parallel
- Ideal for I/O-bound operations: Network requests, database queries, file operations
For CPU-bound parallel processing, consider:
import asyncio
from concurrent.futures import ProcessPoolExecutor
async def parallel_cpu_processing():
executor = ProcessPoolExecutor(max_workers=4)
loop = asyncio.get_event_loop()
# Run CPU-bound task in process pool
results = await loop.run_in_executor(
executor,
cpu_intensive_function,
large_dataset
)
Memory Considerations¶
Be mindful of memory usage with large concurrent operations:
async def memory_efficient_processing():
# Process in batches to control memory usage
batch_size = 1000
concurrent_tasks = 20
async with ctx.scope("batch_processor"):
for batch in iterate_batches(large_dataset, batch_size):
await process_concurrently(
batch,
process_item,
concurrent_tasks=concurrent_tasks
)
# Allow garbage collection between batches
await asyncio.sleep(0)
Summary¶
Haiway's concurrent processing tools provide:
- Structured concurrency through scope-based task management
- Context preservation across task boundaries
- High-level patterns for common concurrent operations
- Resource control through concurrency limits
- Error resilience with proper exception handling
By combining ctx.spawn() for basic task management with the specialized helpers in
haiway.helpers.concurrent, you can build efficient, maintainable concurrent applications that
fully leverage Python's async capabilities while maintaining the safety and structure of Haiway's
functional programming model.