Streaming API (#420)
This PR adds an API for streaming data from workflows.
```python
def write_stream(cls, key: str, value: Any) -> None:
"""
Write a value to a stream.
Args:
key(str): The stream key / name within the workflow
value(Any): A serializable value to write to the stream
"""
def close_stream(cls, key: str) -> None:
"""
Close a stream.
Args:
key(str): The stream key / name within the workflow
"""
def read_stream(cls, workflow_id: str, key: str) -> Generator[Any, Any, None]:
"""
Read values from a stream as a generator.
This function reads values from a stream identified by the workflow_id and key,
yielding each value in order until the stream is closed or the workflow terminates.
Args:
workflow_id(str): The workflow instance ID that owns the stream
key(str): The stream key / name within the workflow
Yields:
Any: Each value in the stream until the stream is closed
"""
```
All methods also have async equivalents.
Streams are append-only and immutable and can be read from anywhere.
Streams can be written to from a workflow or its steps. Writing to a
stream from a workflow is done with exactly-once semantics. Writing to a
stream from a step is at-least-once; if a step fails and is retried, it
may write to the stream multiple times. P
Peter Kraft committed
0bf825a6a98d5fc860268fe176f8eee066862331
Parent: 3e52376
Committed by GitHub <noreply@github.com>
on 8/6/2025, 6:22:03 PM