BatchNode API Reference
Overview
BatchNode enables true parallel execution of multiple nodes using Python's ThreadPoolExecutor. This is essential for Fan-Out/Fan-In patterns where independent branches can run concurrently.
Class Signature
class BatchNode(BaseNode):
def __init__(self, nodes: List[BaseNode], next_node: BaseNode = None)
Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
nodes |
List[BaseNode] |
Yes | List of nodes to execute in parallel. Must be non-empty, and all elements must be BaseNode instances. |
next_node |
BaseNode |
No | The single node to execute after all parallel nodes finish. Defaults to None (graph termination). |
Behavior
1. Execution Flow
- Snapshot the current state
- Create deep copies of the state for each parallel node
- Execute all nodes concurrently in separate threads
- Merge non-conflicting updates back into the main state
- Continue to
next_node
2. State Isolation
Each node runs with its own deep copy of the state, preventing race conditions. Threads cannot interfere with each other's execution.
. State Merging Strategy
After all threads complete, BatchNode merges results:
- New keys: Automatically added to the main state
- Updated keys: Last write wins (race condition - avoid overlapping output_key values)
- Unchanged keys: Ignored
[!WARNING] Overlapping Keys: If multiple nodes write to the same state key, the result is non-deterministic due to thread scheduling. Design your graph to ensure each parallel node writes to unique keys.
4. Error Handling
- If any thread raises an exception, it's logged and
last_erroris set in the main state - Other threads continue execution
- The
BatchNodealways proceeds tonext_node(it doesn't fail-fast)
Example Usage
Simple Parallel Execution
from lar import BatchNode, LLMNode, GraphExecutor
# Define three independent analysis tasks
analyst_1 = LLMNode(
model_name="gpt-4",
prompt_template="Analyze the sentiment of: {text}",
output_key="sentiment_analysis"
)
analyst_2 = LLMNode(
model_name="gpt-4",
prompt_template="Extract key entities from: {text}",
output_key="entity_extraction"
)
analyst_3 = LLMNode(
model_name="gpt-4",
prompt_template="Summarize: {text}",
output_key="summary"
)
# Run all three in parallel
parallel_batch = BatchNode(
nodes=[analyst_1, analyst_2, analyst_3],
next_node=None # End after merging results
)
# Execute
executor = GraphExecutor()
initial_state = {"text": "Apple announced record Q4 earnings..."}
for step in executor.run_step_by_step(parallel_batch, initial_state):
print(f"Step {step['step']}: {step['node']}")
# Final state will have all three analyses
print(final_state.get("sentiment_analysis"))
print(final_state.get("entity_extraction"))
print(final_state.get("summary"))
Newsroom Pattern (Real-World Example)
See examples/scale/3_parallel_newsroom.py for a production pattern:
- Planner Node: LLM generates story angles
- BatchNode: 3 reporter agents research in parallel
- Aggregator Node: Combine findings into final article
Performance Considerations
When to Use BatchNode
✅ Good Use Cases: - Multiple LLM calls with independent inputs (e.g., translating to 5 languages) - Parallel API requests (e.g., fetching data from 3 sources) - Independent data processing tasks
❌ Bad Use Cases: - Sequential dependencies (use linear chains) - Shared mutable resources (threads will conflict) - CPU-bound tasks in CPython (GIL limits parallelism - use ProcessPoolExecutor instead)
Speedup Calculation
For I/O-bound tasks (LLM calls, API requests):
Sequential time: N × T
Parallel time: T + overhead
Speedup = N (ideal)
Real speedup ≈ 0.8N (accounting for thread overhead)
For 3 LLM calls at 2s each: - Sequential: 6 seconds - Parallel: ~2.2 seconds (2.7x faster)
Common Patterns
1. A/B Testing (Compare Multiple Prompts)
BatchNode(
nodes=[
LLMNode(model_name="gpt-4", prompt_template="Prompt A: {task}", output_key="result_a"),
LLMNode(model_name="gpt-4", prompt_template="Prompt B: {task}", output_key="result_b"),
]
)
2. Multi-Model Consensus
BatchNode(
nodes=[
LLMNode(model_name="gpt-4", prompt_template="{query}", output_key="gpt4_answer"),
LLMNode(model_name="claude-3", prompt_template="{query}", output_key="claude_answer"),
LLMNode(model_name="gemini-pro", prompt_template="{query}", output_key="gemini_answer"),
],
next_node=vote_aggregator # Majority voting
)
3. Parallel Data Ingestion
BatchNode(
nodes=[
ToolNode(tool_function=fetch_twitter, input_keys=["query"], output_key="twitter_data"),
ToolNode(tool_function=fetch_reddit, input_keys=["query"], output_key="reddit_data"),
ToolNode(tool_function=fetch_news, input_keys=["query"], output_key="news_data"),
]
)
Debugging
Viewing Audit Logs
The audit log for a BatchNode step shows:
- node: "BatchNode"
- state_diff: All merged keys
- Individual child nodes are NOT logged (by design - they run in threads)
To debug child nodes, wrap them in a separate graph and inspect their logs independently.
Validation
BatchNode validates construction parameters:
- nodes must be a non-empty list
- All elements must be BaseNode instances
- next_node must be BaseNode or None
Example Error:
>>> BatchNode(nodes=["string"]) # Invalid
ValueError: nodes[0] must be a BaseNode instance, got str
See Also
- Multi-Agent Orchestration Guide
- Parallel Corporate Swarm Example
- RouterNode API - For sequential conditional logic