Streaming Guide
Deep dive into streaming. Streaming →
This example demonstrates how to build responsive applications that stream AI responses in real-time. Streaming provides immediate feedback, enables cancellation, and creates a more interactive user experience for long-form content generation.
A streaming content system that:
| Aspect | Non-Streaming | Streaming |
|---|---|---|
| Time to first token | 5-30 seconds | < 500ms |
| User feedback | None until complete | Immediate |
| Cancellation | Must wait | Instant |
| Memory for long responses | High | Low (incremental) |
# Install dependenciesgo get github.com/petal-labs/iris
# Set up API keyiris keys set openaipackage main
import ( "context" "fmt" "log" "os" "os/signal" "strings" "sync" "syscall" "time"
"github.com/petal-labs/iris/core" "github.com/petal-labs/iris/providers/openai")
// StreamingClient handles streaming responsestype StreamingClient struct { client *core.Client model string}
// StreamProgress tracks streaming statetype StreamProgress struct { TokensReceived int CharsReceived int ElapsedTime time.Duration IsComplete bool Error error}
// StreamHandler processes streaming eventstype StreamHandler interface { OnToken(token string) OnProgress(progress StreamProgress) OnComplete(response *core.Response) OnError(err error)}
func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel()
// Handle interrupt for graceful cancellation sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigCh fmt.Println("\n\nCancelling...") cancel() }()
// Create client client, err := NewStreamingClient() if err != nil { log.Fatal(err) }
// Example 1: Basic streaming to console fmt.Println("=== Basic Streaming ===") fmt.Println() err = client.StreamToConsole(ctx, "Write a comprehensive guide to Go error handling patterns. Include examples.", ) if err != nil { if ctx.Err() == context.Canceled { fmt.Println("\n[Cancelled by user]") } else { log.Printf("Streaming error: %v", err) } }
// Example 2: Streaming with progress fmt.Println("\n\n=== Streaming with Progress ===") fmt.Println() handler := &ConsoleHandler{} err = client.StreamWithHandler(ctx, "Summarize the key concepts of distributed systems in 500 words.", handler, ) if err != nil && ctx.Err() != context.Canceled { log.Printf("Error: %v", err) }
// Example 3: Parallel streaming fmt.Println("\n\n=== Parallel Streaming ===") topics := []string{ "Explain Go channels", "Explain Go goroutines", "Explain Go mutexes", } results, err := client.StreamParallel(ctx, topics) if err != nil { log.Printf("Parallel error: %v", err) } for i, result := range results { fmt.Printf("\n--- Topic %d ---\n%s\n", i+1, truncate(result, 200)) }}
// NewStreamingClient creates a streaming-capable clientfunc NewStreamingClient() (*StreamingClient, error) { provider, err := openai.NewFromKeystore() if err != nil { provider, err = openai.NewFromEnv() if err != nil { return nil, fmt.Errorf("openai not configured: %w", err) } }
client := core.NewClient(provider, core.WithRetryPolicy(&core.RetryPolicy{ MaxRetries: 3, InitialInterval: 1 * time.Second, MaxInterval: 30 * time.Second, BackoffMultiplier: 2.0, RetryOn: []int{429, 500, 503}, }), )
return &StreamingClient{ client: client, model: "gpt-4o", }, nil}
// StreamToConsole streams a response directly to stdoutfunc (c *StreamingClient) StreamToConsole(ctx context.Context, prompt string) error { stream, err := c.client.Chat(c.model). System("You are a helpful technical writer. Provide clear, detailed explanations."). User(prompt). Temperature(0.7). GetStream(ctx)
if err != nil { return fmt.Errorf("starting stream: %w", err) }
// Process tokens as they arrive for chunk := range stream.Ch { fmt.Print(chunk.Content) } fmt.Println()
// Check for streaming errors if err := <-stream.Err; err != nil { return fmt.Errorf("streaming error: %w", err) }
// Get final response with usage stats final := <-stream.Final fmt.Printf("\n[Tokens: %d prompt, %d completion, %d total]\n", final.Usage.PromptTokens, final.Usage.CompletionTokens, final.Usage.TotalTokens, )
return nil}
// StreamWithHandler streams with a custom handler for eventsfunc (c *StreamingClient) StreamWithHandler(ctx context.Context, prompt string, handler StreamHandler) error { startTime := time.Now()
stream, err := c.client.Chat(c.model). System("You are a helpful assistant."). User(prompt). Temperature(0.7). GetStream(ctx)
if err != nil { handler.OnError(fmt.Errorf("starting stream: %w", err)) return err }
var tokenCount int var charCount int
// Process tokens with progress updates for chunk := range stream.Ch { handler.OnToken(chunk.Content) tokenCount++ charCount += len(chunk.Content)
// Report progress periodically if tokenCount%10 == 0 { handler.OnProgress(StreamProgress{ TokensReceived: tokenCount, CharsReceived: charCount, ElapsedTime: time.Since(startTime), IsComplete: false, }) } }
// Check for errors if err := <-stream.Err; err != nil { handler.OnError(err) return err }
// Get final response final := <-stream.Final
handler.OnProgress(StreamProgress{ TokensReceived: tokenCount, CharsReceived: charCount, ElapsedTime: time.Since(startTime), IsComplete: true, })
handler.OnComplete(final) return nil}
// StreamParallel runs multiple streams concurrentlyfunc (c *StreamingClient) StreamParallel(ctx context.Context, prompts []string) ([]string, error) { results := make([]string, len(prompts)) errors := make([]error, len(prompts)) var wg sync.WaitGroup
for i, prompt := range prompts { wg.Add(1) go func(idx int, p string) { defer wg.Done()
var builder strings.Builder stream, err := c.client.Chat(c.model). System("Be concise."). User(p). MaxTokens(200). GetStream(ctx)
if err != nil { errors[idx] = err return }
for chunk := range stream.Ch { builder.WriteString(chunk.Content) }
if err := <-stream.Err; err != nil { errors[idx] = err return }
results[idx] = builder.String() }(i, prompt) }
wg.Wait()
// Check for any errors for _, err := range errors { if err != nil { return results, err } }
return results, nil}
// ConsoleHandler implements StreamHandler for console outputtype ConsoleHandler struct { lastProgressTime time.Time}
func (h *ConsoleHandler) OnToken(token string) { fmt.Print(token)}
func (h *ConsoleHandler) OnProgress(progress StreamProgress) { // Only print progress every second to avoid clutter if time.Since(h.lastProgressTime) > time.Second { h.lastProgressTime = time.Now() fmt.Printf("\r[%d tokens, %.1fs elapsed]", progress.TokensReceived, progress.ElapsedTime.Seconds()) }}
func (h *ConsoleHandler) OnComplete(response *core.Response) { fmt.Printf("\n\n[Complete: %d total tokens]\n", response.Usage.TotalTokens)}
func (h *ConsoleHandler) OnError(err error) { fmt.Printf("\n[Error: %v]\n", err)}
func truncate(s string, maxLen int) string { if len(s) <= maxLen { return s } return s[:maxLen] + "..."}// Streaming with Server-Sent Events (SSE)package main
import ( "context" "encoding/json" "fmt" "net/http" "time"
"github.com/petal-labs/iris/core" "github.com/petal-labs/iris/providers/openai")
type StreamEvent struct { Type string `json:"type"` Content string `json:"content,omitempty"` Done bool `json:"done,omitempty"` Usage *Usage `json:"usage,omitempty"`}
type Usage struct { PromptTokens int `json:"prompt_tokens"` CompletionTokens int `json:"completion_tokens"` TotalTokens int `json:"total_tokens"`}
func main() { http.HandleFunc("/stream", streamHandler) fmt.Println("Server running on :8080") http.ListenAndServe(":8080", nil)}
func streamHandler(w http.ResponseWriter, r *http.Request) { // Set SSE headers w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "*")
flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming not supported", http.StatusInternalServerError) return }
// Get prompt from request prompt := r.URL.Query().Get("prompt") if prompt == "" { prompt = "Tell me a story" }
// Create context that cancels when client disconnects ctx := r.Context()
// Initialize client provider, _ := openai.NewFromEnv() client := core.NewClient(provider)
// Start streaming stream, err := client.Chat("gpt-4o"). User(prompt). GetStream(ctx)
if err != nil { sendSSE(w, flusher, StreamEvent{Type: "error", Content: err.Error()}) return }
// Stream tokens for chunk := range stream.Ch { event := StreamEvent{Type: "token", Content: chunk.Content} sendSSE(w, flusher, event) }
// Check for errors if err := <-stream.Err; err != nil { sendSSE(w, flusher, StreamEvent{Type: "error", Content: err.Error()}) return }
// Send completion with usage final := <-stream.Final sendSSE(w, flusher, StreamEvent{ Type: "done", Done: true, Usage: &Usage{ PromptTokens: final.Usage.PromptTokens, CompletionTokens: final.Usage.CompletionTokens, TotalTokens: final.Usage.TotalTokens, }, })}
func sendSSE(w http.ResponseWriter, flusher http.Flusher, event StreamEvent) { data, _ := json.Marshal(event) fmt.Fprintf(w, "data: %s\n\n", data) flusher.Flush()}// Streaming with Claudeimport "github.com/petal-labs/iris/providers/anthropic"
func NewClaudeStreamingClient() (*StreamingClient, error) { provider, err := anthropic.NewFromKeystore() if err != nil { return nil, err }
client := core.NewClient(provider)
return &StreamingClient{ client: client, model: "claude-sonnet-4-20250514", }, nil}
// Stream with extended thinking (Claude)func (c *StreamingClient) StreamWithThinking(ctx context.Context, prompt string) error { stream, err := c.client.Chat(c.model). System("You are a helpful assistant."). User(prompt). ExtendedThinking(true). ThinkingBudget(5000). GetStream(ctx)
if err != nil { return err }
// Claude streams thinking first, then response fmt.Println("=== Thinking ===") inThinking := true
for chunk := range stream.Ch { // Check if we've transitioned from thinking to response if inThinking && chunk.Type == "text" { fmt.Println("\n\n=== Response ===") inThinking = false } fmt.Print(chunk.Content) }
if err := <-stream.Err; err != nil { return err }
return nil}The streaming API returns a ChatStream with three channels:
stream, err := client.Chat(model).User(prompt).GetStream(ctx)if err != nil { return err}
// Channel 1: Token chunksfor chunk := range stream.Ch { fmt.Print(chunk.Content) // Print each token}
// Channel 2: Error (blocks until stream ends)if err := <-stream.Err; err != nil { return err}
// Channel 3: Final response with full usage statsfinal := <-stream.Finalfmt.Printf("Tokens used: %d\n", final.Usage.TotalTokens)Each chunk contains:
type StreamChunk struct { Content string // The token text Type string // "text", "thinking", etc. Index int // Position in stream}Cancellation is handled via context:
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)defer cancel()
// Or with manual cancellationctx, cancel := context.WithCancel(context.Background())go func() { <-userCancelCh cancel() // This stops the stream}()
stream, err := client.Chat(model).User(prompt).GetStream(ctx)For high-throughput applications, buffer the stream:
func streamWithBuffer(ctx context.Context, client *core.Client, prompt string) (string, error) { stream, err := client.Chat("gpt-4o").User(prompt).GetStream(ctx) if err != nil { return "", err }
// Pre-allocate buffer for expected size var buffer strings.Builder buffer.Grow(4096) // Estimate 4KB for typical response
for chunk := range stream.Ch { buffer.WriteString(chunk.Content) }
if err := <-stream.Err; err != nil { return buffer.String(), err // Return partial result }
return buffer.String(), nil}Control output rate for smooth rendering:
func streamWithRateLimit(ctx context.Context, stream *core.ChatStream, minInterval time.Duration) <-chan string { out := make(chan string)
go func() { defer close(out) var buffer strings.Builder ticker := time.NewTicker(minInterval) defer ticker.Stop()
for { select { case chunk, ok := <-stream.Ch: if !ok { // Flush remaining buffer if buffer.Len() > 0 { out <- buffer.String() } return } buffer.WriteString(chunk.Content)
case <-ticker.C: // Flush buffer at rate limit interval if buffer.Len() > 0 { out <- buffer.String() buffer.Reset() }
case <-ctx.Done(): return } } }()
return out}
// UsagesmoothStream := streamWithRateLimit(ctx, stream, 50*time.Millisecond)for text := range smoothStream { fmt.Print(text)}For long streams, implement retry with partial resume:
type StreamState struct { PartialContent string TokenCount int}
func streamWithRetry(ctx context.Context, client *core.Client, prompt string, maxRetries int) (string, error) { var state StreamState var lastErr error
for attempt := 0; attempt < maxRetries; attempt++ { var resumePrompt string if state.PartialContent != "" { resumePrompt = fmt.Sprintf(`Continue from where you left off. You were writing:---%s---Continue immediately without repeating.`, state.PartialContent) } else { resumePrompt = prompt }
stream, err := client.Chat("gpt-4o").User(resumePrompt).GetStream(ctx) if err != nil { lastErr = err time.Sleep(time.Duration(attempt+1) * time.Second) continue }
var buffer strings.Builder for chunk := range stream.Ch { buffer.WriteString(chunk.Content) state.PartialContent = buffer.String() state.TokenCount++ }
if err := <-stream.Err; err != nil { lastErr = err continue }
return state.PartialContent, nil }
return state.PartialContent, fmt.Errorf("max retries exceeded: %w", lastErr)}Fan out to multiple consumers:
func streamToMultiple(ctx context.Context, stream *core.ChatStream, destinations ...chan<- string) { go func() { for chunk := range stream.Ch { for _, dest := range destinations { select { case dest <- chunk.Content: case <-ctx.Done(): return } } }
// Close all destinations for _, dest := range destinations { close(dest) } }()}
// UsageconsoleCh := make(chan string)logCh := make(chan string)bufferCh := make(chan string)
streamToMultiple(ctx, stream, consoleCh, logCh, bufferCh)
// Each channel receives the same tokensgo func() { for t := range consoleCh { fmt.Print(t) } }()go func() { for t := range logCh { logger.Info("token", "content", t) } }()func handleChatStream(ctx context.Context, client *core.Client, message string, ui ChatUI) error { ui.ShowTypingIndicator(true) defer ui.ShowTypingIndicator(false)
stream, err := client.Chat("gpt-4o"). System("You are a helpful assistant."). User(message). GetStream(ctx)
if err != nil { return ui.ShowError(err) }
ui.StartNewMessage()
for chunk := range stream.Ch { ui.AppendText(chunk.Content) }
if err := <-stream.Err; err != nil { return ui.ShowError(err) }
final := <-stream.Final ui.ShowTokenCount(final.Usage.TotalTokens)
return nil}func summarizeDocument(ctx context.Context, client *core.Client, document string, progress chan<- float64) (string, error) { // Estimate expected output length expectedTokens := len(document) / 20 // Rough estimate
stream, err := client.Chat("gpt-4o"). System("Summarize the following document concisely."). User(document). GetStream(ctx)
if err != nil { return "", err }
var result strings.Builder tokenCount := 0
for chunk := range stream.Ch { result.WriteString(chunk.Content) tokenCount++
// Report progress estimated := float64(tokenCount) / float64(expectedTokens) if estimated > 1.0 { estimated = 0.95 // Cap at 95% until done } progress <- estimated }
if err := <-stream.Err; err != nil { return result.String(), err }
progress <- 1.0 // Complete return result.String(), nil}=== Basic Streaming ===
# Go Error Handling Patterns
Go takes a unique approach to error handling that emphasizes explicit errorchecking over exceptions. This guide covers the essential patterns...
[Content streams in real-time]
[Tokens: 45 prompt, 523 completion, 568 total]
=== Streaming with Progress ===
Distributed systems are computing environments where components...[245 tokens, 3.2s elapsed]...communicate over a network to achieve common goals.
[Complete: 312 total tokens]
=== Parallel Streaming ===
--- Topic 1 ---Go channels are typed conduits through which you can send and receive values...
--- Topic 2 ---Goroutines are lightweight threads managed by the Go runtime...
--- Topic 3 ---A mutex (mutual exclusion lock) is a synchronization primitive...| Practice | Recommendation |
|---|---|
| Always handle errors | Check both stream.Err and context errors |
| Use context for cancellation | Pass context to enable clean shutdown |
| Buffer for performance | Use strings.Builder for accumulation |
| Rate limit UI updates | Batch tokens for smooth rendering |
| Log partial results | Save progress for retry/resume scenarios |
| Clean up resources | Ensure stream channels are drained |
Streaming Guide
Deep dive into streaming. Streaming →
OpenAI Provider
Provider streaming details. OpenAI →