Skip to content

Streaming Responses

Streaming delivers model output incrementally as it’s generated, enabling responsive UIs and real-time applications. Iris provides a streaming-first API with well-defined channels for content, errors, and final responses.

  • Better UX: Users see responses immediately instead of waiting for completion
  • Lower perceived latency: First token appears in ~200ms vs 2-10s for full response
  • Early cancellation: Users can stop generation if output isn’t relevant
  • Memory efficiency: Process large outputs without buffering everything
  • Real-time applications: Chat interfaces, live transcription, collaborative editing

The ChatStream type provides three channels for stream processing:

type ChatStream struct {
Ch <-chan StreamChunk // Incremental content chunks
Err <-chan error // Error channel (at most one error)
Final <-chan *ChatResponse // Aggregated response after completion
}
type StreamChunk struct {
Content string // Text content delta
ToolCalls []ToolCall // Tool call deltas
Usage *Usage // Token usage (final chunk only)
FinishReason string // Why generation stopped
}
package main
import (
"context"
"fmt"
"os"
"github.com/petal-labs/iris/core"
"github.com/petal-labs/iris/providers/openai"
)
func main() {
provider := openai.New(os.Getenv("OPENAI_API_KEY"))
client := core.NewClient(provider)
stream, err := client.Chat("gpt-4o").
System("You are a helpful assistant.").
User("Write a short story about a robot learning to paint.").
GetStream(context.Background())
if err != nil {
panic(err)
}
// Process chunks as they arrive
for chunk := range stream.Ch {
fmt.Print(chunk.Content) // Print without newline for streaming effect
}
fmt.Println() // Final newline
// Always check for errors after stream completes
if err := <-stream.Err; err != nil {
panic(err)
}
// Get the final aggregated response
final := <-stream.Final
fmt.Printf("\nTotal tokens: %d\n", final.Usage.TotalTokens)
}

When you only need the final response and don’t need real-time processing:

stream, err := client.Chat("gpt-4o").
User("Summarize this document").
GetStream(ctx)
if err != nil {
return nil, err
}
// DrainStream consumes all chunks and returns the final response
resp, err := core.DrainStream(ctx, stream)
if err != nil {
return nil, err
}
fmt.Println(resp.Output)

Build up the full response while still processing incrementally:

var fullContent strings.Builder
for chunk := range stream.Ch {
fullContent.WriteString(chunk.Content)
// Update UI with each chunk
updateUI(chunk.Content)
}
// Use accumulated content
processComplete(fullContent.String())

Track streaming progress for UI indicators:

type StreamProgress struct {
ChunksReceived int
TotalContent int
LastChunk time.Time
}
progress := &StreamProgress{}
for chunk := range stream.Ch {
progress.ChunksReceived++
progress.TotalContent += len(chunk.Content)
progress.LastChunk = time.Now()
// Update progress indicator
fmt.Printf("\rReceived %d chunks, %d characters...",
progress.ChunksReceived, progress.TotalContent)
// Process content
processChunk(chunk.Content)
}

Process chunks in batches for efficiency:

const batchSize = 10
buffer := make([]StreamChunk, 0, batchSize)
for chunk := range stream.Ch {
buffer = append(buffer, chunk)
if len(buffer) >= batchSize {
processBatch(buffer)
buffer = buffer[:0] // Reset buffer
}
}
// Process remaining chunks
if len(buffer) > 0 {
processBatch(buffer)
}
stream, err := client.Chat("gpt-4o").User(prompt).GetStream(ctx)
if err != nil {
// Handle initial connection error
return fmt.Errorf("failed to start stream: %w", err)
}
for chunk := range stream.Ch {
fmt.Print(chunk.Content)
}
// Check for streaming errors
if err := <-stream.Err; err != nil {
var rateLimitErr *core.RateLimitError
var serverErr *core.ServerError
switch {
case errors.As(err, &rateLimitErr):
log.Printf("Rate limited during stream, retry after: %v", rateLimitErr.RetryAfter)
case errors.As(err, &serverErr):
log.Printf("Server error during stream: %s", serverErr.Message)
default:
log.Printf("Stream error: %v", err)
}
}

Fall back to non-streaming on error:

func ChatWithFallback(ctx context.Context, client *core.Client, prompt string) (string, error) {
// Try streaming first
stream, err := client.Chat("gpt-4o").User(prompt).GetStream(ctx)
if err == nil {
var content strings.Builder
for chunk := range stream.Ch {
content.WriteString(chunk.Content)
}
if streamErr := <-stream.Err; streamErr == nil {
return content.String(), nil
}
// Stream failed mid-way, fall through to retry
}
// Fall back to non-streaming
log.Println("Streaming failed, falling back to standard request")
resp, err := client.Chat("gpt-4o").User(prompt).GetResponse(ctx)
if err != nil {
return "", err
}
return resp.Output, nil
}

Handle partial responses when stream fails:

func StreamWithRecovery(ctx context.Context, client *core.Client, prompt string) (string, error) {
var accumulated strings.Builder
stream, err := client.Chat("gpt-4o").User(prompt).GetStream(ctx)
if err != nil {
return "", err
}
for chunk := range stream.Ch {
accumulated.WriteString(chunk.Content)
}
if err := <-stream.Err; err != nil {
// Return partial content with error
if accumulated.Len() > 0 {
return accumulated.String(), fmt.Errorf("partial response (%d chars): %w",
accumulated.Len(), err)
}
return "", err
}
return accumulated.String(), nil
}

Cancel streaming with context:

// Create cancellable context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.Chat("gpt-4o").
User("Write a very long essay...").
GetStream(ctx)
if err != nil {
return err
}
// Process in goroutine
go func() {
for chunk := range stream.Ch {
fmt.Print(chunk.Content)
}
}()
// Cancel after 5 seconds
time.Sleep(5 * time.Second)
cancel()
// Check for cancellation
if err := <-stream.Err; err != nil {
if errors.Is(err, context.Canceled) {
fmt.Println("\nStream cancelled by user")
}
}
// Cancel if stream takes too long
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
stream, err := client.Chat("gpt-4o").User(prompt).GetStream(ctx)
if err != nil {
return err
}
for chunk := range stream.Ch {
fmt.Print(chunk.Content)
}
if err := <-stream.Err; err != nil {
if errors.Is(err, context.DeadlineExceeded) {
fmt.Println("\nStream timed out")
}
}
func StreamWithUserCancel(ctx context.Context, client *core.Client, prompt string, stopCh <-chan struct{}) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
stream, err := client.Chat("gpt-4o").User(prompt).GetStream(ctx)
if err != nil {
return err
}
for {
select {
case chunk, ok := <-stream.Ch:
if !ok {
// Stream completed
return <-stream.Err
}
fmt.Print(chunk.Content)
case <-stopCh:
// User requested stop
cancel()
return nil
}
}
}

Tool calls can be streamed incrementally:

stream, err := client.Chat("gpt-4o").
System("You are a helpful assistant with access to tools.").
User("What's the weather in Tokyo and New York?").
Tools(weatherTool).
GetStream(ctx)
if err != nil {
return err
}
var toolCallAccumulator = make(map[int]*core.ToolCall)
for chunk := range stream.Ch {
// Handle content
if chunk.Content != "" {
fmt.Print(chunk.Content)
}
// Handle tool call deltas
for _, tc := range chunk.ToolCalls {
if existing, ok := toolCallAccumulator[tc.Index]; ok {
// Append to existing tool call
existing.Function.Arguments += tc.Function.Arguments
} else {
// New tool call
toolCallAccumulator[tc.Index] = &core.ToolCall{
ID: tc.ID,
Type: tc.Type,
Function: tc.Function,
}
}
}
}
// Process complete tool calls
for _, tc := range toolCallAccumulator {
fmt.Printf("\nTool call: %s(%s)\n", tc.Function.Name, tc.Function.Arguments)
}

Integrate streaming with HTTP SSE for web applications.

package main
import (
"context"
"encoding/json"
"fmt"
"net/http"
"os"
"github.com/petal-labs/iris/core"
"github.com/petal-labs/iris/providers/openai"
)
func sseHandler(w http.ResponseWriter, r *http.Request) {
// Set SSE headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming not supported", http.StatusInternalServerError)
return
}
// Get prompt from query
prompt := r.URL.Query().Get("prompt")
if prompt == "" {
prompt = "Hello, how can I help you?"
}
// Create client
provider := openai.New(os.Getenv("OPENAI_API_KEY"))
client := core.NewClient(provider)
// Start stream with request context (cancels on client disconnect)
stream, err := client.Chat("gpt-4o").
User(prompt).
GetStream(r.Context())
if err != nil {
fmt.Fprintf(w, "event: error\ndata: %s\n\n", err.Error())
flusher.Flush()
return
}
// Stream chunks as SSE events
for chunk := range stream.Ch {
data, _ := json.Marshal(map[string]string{
"content": chunk.Content,
})
fmt.Fprintf(w, "event: message\ndata: %s\n\n", data)
flusher.Flush()
}
// Check for errors
if err := <-stream.Err; err != nil {
fmt.Fprintf(w, "event: error\ndata: %s\n\n", err.Error())
flusher.Flush()
return
}
// Send completion event
final := <-stream.Final
data, _ := json.Marshal(map[string]any{
"done": true,
"usage": final.Usage,
})
fmt.Fprintf(w, "event: done\ndata: %s\n\n", data)
flusher.Flush()
}
func main() {
http.HandleFunc("/chat", sseHandler)
http.ListenAndServe(":8080", nil)
}
const eventSource = new EventSource('/chat?prompt=Hello');
eventSource.addEventListener('message', (event) => {
const data = JSON.parse(event.data);
document.getElementById('output').textContent += data.content;
});
eventSource.addEventListener('done', (event) => {
const data = JSON.parse(event.data);
console.log('Stream complete, tokens used:', data.usage.total_tokens);
eventSource.close();
});
eventSource.addEventListener('error', (event) => {
console.error('Stream error:', event);
eventSource.close();
});

For bidirectional communication:

package main
import (
"context"
"encoding/json"
"log"
"net/http"
"os"
"github.com/gorilla/websocket"
"github.com/petal-labs/iris/core"
"github.com/petal-labs/iris/providers/openai"
)
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool { return true },
}
type Message struct {
Type string `json:"type"`
Content string `json:"content"`
}
func wsHandler(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer conn.Close()
provider := openai.New(os.Getenv("OPENAI_API_KEY"))
client := core.NewClient(provider)
for {
// Read user message
_, msgData, err := conn.ReadMessage()
if err != nil {
break
}
var msg Message
json.Unmarshal(msgData, &msg)
// Stream response
stream, err := client.Chat("gpt-4o").
User(msg.Content).
GetStream(context.Background())
if err != nil {
conn.WriteJSON(Message{Type: "error", Content: err.Error()})
continue
}
// Send chunks
for chunk := range stream.Ch {
conn.WriteJSON(Message{Type: "chunk", Content: chunk.Content})
}
// Check for errors
if streamErr := <-stream.Err; streamErr != nil {
conn.WriteJSON(Message{Type: "error", Content: streamErr.Error()})
}
// Send completion
conn.WriteJSON(Message{Type: "done", Content: ""})
}
}
func MultiStream(ctx context.Context, client *core.Client, prompts []string) ([]string, error) {
results := make([]string, len(prompts))
errors := make([]error, len(prompts))
var wg sync.WaitGroup
for i, prompt := range prompts {
wg.Add(1)
go func(idx int, p string) {
defer wg.Done()
stream, err := client.Chat("gpt-4o").User(p).GetStream(ctx)
if err != nil {
errors[idx] = err
return
}
var content strings.Builder
for chunk := range stream.Ch {
content.WriteString(chunk.Content)
}
if err := <-stream.Err; err != nil {
errors[idx] = err
return
}
results[idx] = content.String()
}(i, prompt)
}
wg.Wait()
// Check for any errors
for _, err := range errors {
if err != nil {
return results, fmt.Errorf("one or more streams failed: %w", err)
}
}
return results, nil
}

Broadcast stream chunks to multiple consumers:

type StreamFanOut struct {
subscribers []chan StreamChunk
mu sync.RWMutex
}
func NewStreamFanOut() *StreamFanOut {
return &StreamFanOut{
subscribers: make([]chan StreamChunk, 0),
}
}
func (f *StreamFanOut) Subscribe() <-chan StreamChunk {
ch := make(chan StreamChunk, 100)
f.mu.Lock()
f.subscribers = append(f.subscribers, ch)
f.mu.Unlock()
return ch
}
func (f *StreamFanOut) Broadcast(chunk StreamChunk) {
f.mu.RLock()
defer f.mu.RUnlock()
for _, ch := range f.subscribers {
select {
case ch <- chunk:
default:
// Subscriber too slow, skip
}
}
}
func (f *StreamFanOut) Close() {
f.mu.Lock()
defer f.mu.Unlock()
for _, ch := range f.subscribers {
close(ch)
}
}
// Usage
fanOut := NewStreamFanOut()
// Add subscribers
uiCh := fanOut.Subscribe()
logCh := fanOut.Subscribe()
// Process stream
go func() {
for chunk := range uiCh {
updateUI(chunk.Content)
}
}()
go func() {
for chunk := range logCh {
logChunk(chunk)
}
}()
// Broadcast chunks
stream, _ := client.Chat("gpt-4o").User(prompt).GetStream(ctx)
for chunk := range stream.Ch {
fanOut.Broadcast(chunk)
}
fanOut.Close()
// Bad - goroutine leak
for chunk := range stream.Ch {
fmt.Print(chunk.Content)
}
// Missing: <-stream.Err and <-stream.Final
// Good
for chunk := range stream.Ch {
fmt.Print(chunk.Content)
}
if err := <-stream.Err; err != nil {
log.Printf("Error: %v", err)
}
<-stream.Final // Even if you don't need it
// Always use context for timeout/cancellation control
ctx, cancel := context.WithTimeout(ctx, 60*time.Second)
defer cancel()
stream, err := client.Chat(model).User(prompt).GetStream(ctx)
// Builders are not thread-safe
builder := client.Chat("gpt-4o").System("You are helpful.")
// Use Clone() for concurrent streams
go func() {
stream, _ := builder.Clone().User("Question 1").GetStream(ctx)
// process...
}()
go func() {
stream, _ := builder.Clone().User("Question 2").GetStream(ctx)
// process...
}()
// Use buffered processing if consumer is slow
const bufferSize = 100
buffer := make(chan string, bufferSize)
go func() {
for chunk := range stream.Ch {
select {
case buffer <- chunk.Content:
default:
log.Println("Warning: buffer full, dropping chunk")
}
}
close(buffer)
}()
// Slow consumer
for content := range buffer {
slowProcess(content)
}
start := time.Now()
var chunkCount int
var totalBytes int
for chunk := range stream.Ch {
chunkCount++
totalBytes += len(chunk.Content)
fmt.Print(chunk.Content)
}
duration := time.Since(start)
log.Printf("Stream completed: %d chunks, %d bytes, %v duration",
chunkCount, totalBytes, duration)

Tools Guide

Combine streaming with tool calling. Tools →