Skip to content

Streaming Summaries

This example demonstrates how to build responsive applications that stream AI responses in real-time. Streaming provides immediate feedback, enables cancellation, and creates a more interactive user experience for long-form content generation.

A streaming content system that:

  1. Renders tokens as they arrive for immediate feedback
  2. Tracks progress with estimated completion
  3. Supports cancellation for user control
  4. Handles errors gracefully mid-stream
AspectNon-StreamingStreaming
Time to first token5-30 seconds< 500ms
User feedbackNone until completeImmediate
CancellationMust waitInstant
Memory for long responsesHighLow (incremental)
Terminal window
# Install dependencies
go get github.com/petal-labs/iris
# Set up API key
iris keys set openai
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
"github.com/petal-labs/iris/core"
"github.com/petal-labs/iris/providers/openai"
)
// StreamingClient handles streaming responses
type StreamingClient struct {
client *core.Client
model string
}
// StreamProgress tracks streaming state
type StreamProgress struct {
TokensReceived int
CharsReceived int
ElapsedTime time.Duration
IsComplete bool
Error error
}
// StreamHandler processes streaming events
type StreamHandler interface {
OnToken(token string)
OnProgress(progress StreamProgress)
OnComplete(response *core.Response)
OnError(err error)
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Handle interrupt for graceful cancellation
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
fmt.Println("\n\nCancelling...")
cancel()
}()
// Create client
client, err := NewStreamingClient()
if err != nil {
log.Fatal(err)
}
// Example 1: Basic streaming to console
fmt.Println("=== Basic Streaming ===")
fmt.Println()
err = client.StreamToConsole(ctx,
"Write a comprehensive guide to Go error handling patterns. Include examples.",
)
if err != nil {
if ctx.Err() == context.Canceled {
fmt.Println("\n[Cancelled by user]")
} else {
log.Printf("Streaming error: %v", err)
}
}
// Example 2: Streaming with progress
fmt.Println("\n\n=== Streaming with Progress ===")
fmt.Println()
handler := &ConsoleHandler{}
err = client.StreamWithHandler(ctx,
"Summarize the key concepts of distributed systems in 500 words.",
handler,
)
if err != nil && ctx.Err() != context.Canceled {
log.Printf("Error: %v", err)
}
// Example 3: Parallel streaming
fmt.Println("\n\n=== Parallel Streaming ===")
topics := []string{
"Explain Go channels",
"Explain Go goroutines",
"Explain Go mutexes",
}
results, err := client.StreamParallel(ctx, topics)
if err != nil {
log.Printf("Parallel error: %v", err)
}
for i, result := range results {
fmt.Printf("\n--- Topic %d ---\n%s\n", i+1, truncate(result, 200))
}
}
// NewStreamingClient creates a streaming-capable client
func NewStreamingClient() (*StreamingClient, error) {
provider, err := openai.NewFromKeystore()
if err != nil {
provider, err = openai.NewFromEnv()
if err != nil {
return nil, fmt.Errorf("openai not configured: %w", err)
}
}
client := core.NewClient(provider,
core.WithRetryPolicy(&core.RetryPolicy{
MaxRetries: 3,
InitialInterval: 1 * time.Second,
MaxInterval: 30 * time.Second,
BackoffMultiplier: 2.0,
RetryOn: []int{429, 500, 503},
}),
)
return &StreamingClient{
client: client,
model: "gpt-4o",
}, nil
}
// StreamToConsole streams a response directly to stdout
func (c *StreamingClient) StreamToConsole(ctx context.Context, prompt string) error {
stream, err := c.client.Chat(c.model).
System("You are a helpful technical writer. Provide clear, detailed explanations.").
User(prompt).
Temperature(0.7).
GetStream(ctx)
if err != nil {
return fmt.Errorf("starting stream: %w", err)
}
// Process tokens as they arrive
for chunk := range stream.Ch {
fmt.Print(chunk.Content)
}
fmt.Println()
// Check for streaming errors
if err := <-stream.Err; err != nil {
return fmt.Errorf("streaming error: %w", err)
}
// Get final response with usage stats
final := <-stream.Final
fmt.Printf("\n[Tokens: %d prompt, %d completion, %d total]\n",
final.Usage.PromptTokens,
final.Usage.CompletionTokens,
final.Usage.TotalTokens,
)
return nil
}
// StreamWithHandler streams with a custom handler for events
func (c *StreamingClient) StreamWithHandler(ctx context.Context, prompt string, handler StreamHandler) error {
startTime := time.Now()
stream, err := c.client.Chat(c.model).
System("You are a helpful assistant.").
User(prompt).
Temperature(0.7).
GetStream(ctx)
if err != nil {
handler.OnError(fmt.Errorf("starting stream: %w", err))
return err
}
var tokenCount int
var charCount int
// Process tokens with progress updates
for chunk := range stream.Ch {
handler.OnToken(chunk.Content)
tokenCount++
charCount += len(chunk.Content)
// Report progress periodically
if tokenCount%10 == 0 {
handler.OnProgress(StreamProgress{
TokensReceived: tokenCount,
CharsReceived: charCount,
ElapsedTime: time.Since(startTime),
IsComplete: false,
})
}
}
// Check for errors
if err := <-stream.Err; err != nil {
handler.OnError(err)
return err
}
// Get final response
final := <-stream.Final
handler.OnProgress(StreamProgress{
TokensReceived: tokenCount,
CharsReceived: charCount,
ElapsedTime: time.Since(startTime),
IsComplete: true,
})
handler.OnComplete(final)
return nil
}
// StreamParallel runs multiple streams concurrently
func (c *StreamingClient) StreamParallel(ctx context.Context, prompts []string) ([]string, error) {
results := make([]string, len(prompts))
errors := make([]error, len(prompts))
var wg sync.WaitGroup
for i, prompt := range prompts {
wg.Add(1)
go func(idx int, p string) {
defer wg.Done()
var builder strings.Builder
stream, err := c.client.Chat(c.model).
System("Be concise.").
User(p).
MaxTokens(200).
GetStream(ctx)
if err != nil {
errors[idx] = err
return
}
for chunk := range stream.Ch {
builder.WriteString(chunk.Content)
}
if err := <-stream.Err; err != nil {
errors[idx] = err
return
}
results[idx] = builder.String()
}(i, prompt)
}
wg.Wait()
// Check for any errors
for _, err := range errors {
if err != nil {
return results, err
}
}
return results, nil
}
// ConsoleHandler implements StreamHandler for console output
type ConsoleHandler struct {
lastProgressTime time.Time
}
func (h *ConsoleHandler) OnToken(token string) {
fmt.Print(token)
}
func (h *ConsoleHandler) OnProgress(progress StreamProgress) {
// Only print progress every second to avoid clutter
if time.Since(h.lastProgressTime) > time.Second {
h.lastProgressTime = time.Now()
fmt.Printf("\r[%d tokens, %.1fs elapsed]", progress.TokensReceived, progress.ElapsedTime.Seconds())
}
}
func (h *ConsoleHandler) OnComplete(response *core.Response) {
fmt.Printf("\n\n[Complete: %d total tokens]\n", response.Usage.TotalTokens)
}
func (h *ConsoleHandler) OnError(err error) {
fmt.Printf("\n[Error: %v]\n", err)
}
func truncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen] + "..."
}

The streaming API returns a ChatStream with three channels:

stream, err := client.Chat(model).User(prompt).GetStream(ctx)
if err != nil {
return err
}
// Channel 1: Token chunks
for chunk := range stream.Ch {
fmt.Print(chunk.Content) // Print each token
}
// Channel 2: Error (blocks until stream ends)
if err := <-stream.Err; err != nil {
return err
}
// Channel 3: Final response with full usage stats
final := <-stream.Final
fmt.Printf("Tokens used: %d\n", final.Usage.TotalTokens)

Each chunk contains:

type StreamChunk struct {
Content string // The token text
Type string // "text", "thinking", etc.
Index int // Position in stream
}

Cancellation is handled via context:

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Or with manual cancellation
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-userCancelCh
cancel() // This stops the stream
}()
stream, err := client.Chat(model).User(prompt).GetStream(ctx)

For high-throughput applications, buffer the stream:

func streamWithBuffer(ctx context.Context, client *core.Client, prompt string) (string, error) {
stream, err := client.Chat("gpt-4o").User(prompt).GetStream(ctx)
if err != nil {
return "", err
}
// Pre-allocate buffer for expected size
var buffer strings.Builder
buffer.Grow(4096) // Estimate 4KB for typical response
for chunk := range stream.Ch {
buffer.WriteString(chunk.Content)
}
if err := <-stream.Err; err != nil {
return buffer.String(), err // Return partial result
}
return buffer.String(), nil
}

Control output rate for smooth rendering:

func streamWithRateLimit(ctx context.Context, stream *core.ChatStream, minInterval time.Duration) <-chan string {
out := make(chan string)
go func() {
defer close(out)
var buffer strings.Builder
ticker := time.NewTicker(minInterval)
defer ticker.Stop()
for {
select {
case chunk, ok := <-stream.Ch:
if !ok {
// Flush remaining buffer
if buffer.Len() > 0 {
out <- buffer.String()
}
return
}
buffer.WriteString(chunk.Content)
case <-ticker.C:
// Flush buffer at rate limit interval
if buffer.Len() > 0 {
out <- buffer.String()
buffer.Reset()
}
case <-ctx.Done():
return
}
}
}()
return out
}
// Usage
smoothStream := streamWithRateLimit(ctx, stream, 50*time.Millisecond)
for text := range smoothStream {
fmt.Print(text)
}

For long streams, implement retry with partial resume:

type StreamState struct {
PartialContent string
TokenCount int
}
func streamWithRetry(ctx context.Context, client *core.Client, prompt string, maxRetries int) (string, error) {
var state StreamState
var lastErr error
for attempt := 0; attempt < maxRetries; attempt++ {
var resumePrompt string
if state.PartialContent != "" {
resumePrompt = fmt.Sprintf(`Continue from where you left off. You were writing:
---
%s
---
Continue immediately without repeating.`, state.PartialContent)
} else {
resumePrompt = prompt
}
stream, err := client.Chat("gpt-4o").User(resumePrompt).GetStream(ctx)
if err != nil {
lastErr = err
time.Sleep(time.Duration(attempt+1) * time.Second)
continue
}
var buffer strings.Builder
for chunk := range stream.Ch {
buffer.WriteString(chunk.Content)
state.PartialContent = buffer.String()
state.TokenCount++
}
if err := <-stream.Err; err != nil {
lastErr = err
continue
}
return state.PartialContent, nil
}
return state.PartialContent, fmt.Errorf("max retries exceeded: %w", lastErr)
}

Fan out to multiple consumers:

func streamToMultiple(ctx context.Context, stream *core.ChatStream, destinations ...chan<- string) {
go func() {
for chunk := range stream.Ch {
for _, dest := range destinations {
select {
case dest <- chunk.Content:
case <-ctx.Done():
return
}
}
}
// Close all destinations
for _, dest := range destinations {
close(dest)
}
}()
}
// Usage
consoleCh := make(chan string)
logCh := make(chan string)
bufferCh := make(chan string)
streamToMultiple(ctx, stream, consoleCh, logCh, bufferCh)
// Each channel receives the same tokens
go func() { for t := range consoleCh { fmt.Print(t) } }()
go func() { for t := range logCh { logger.Info("token", "content", t) } }()
func handleChatStream(ctx context.Context, client *core.Client, message string, ui ChatUI) error {
ui.ShowTypingIndicator(true)
defer ui.ShowTypingIndicator(false)
stream, err := client.Chat("gpt-4o").
System("You are a helpful assistant.").
User(message).
GetStream(ctx)
if err != nil {
return ui.ShowError(err)
}
ui.StartNewMessage()
for chunk := range stream.Ch {
ui.AppendText(chunk.Content)
}
if err := <-stream.Err; err != nil {
return ui.ShowError(err)
}
final := <-stream.Final
ui.ShowTokenCount(final.Usage.TotalTokens)
return nil
}
func summarizeDocument(ctx context.Context, client *core.Client, document string, progress chan<- float64) (string, error) {
// Estimate expected output length
expectedTokens := len(document) / 20 // Rough estimate
stream, err := client.Chat("gpt-4o").
System("Summarize the following document concisely.").
User(document).
GetStream(ctx)
if err != nil {
return "", err
}
var result strings.Builder
tokenCount := 0
for chunk := range stream.Ch {
result.WriteString(chunk.Content)
tokenCount++
// Report progress
estimated := float64(tokenCount) / float64(expectedTokens)
if estimated > 1.0 {
estimated = 0.95 // Cap at 95% until done
}
progress <- estimated
}
if err := <-stream.Err; err != nil {
return result.String(), err
}
progress <- 1.0 // Complete
return result.String(), nil
}
=== Basic Streaming ===
# Go Error Handling Patterns
Go takes a unique approach to error handling that emphasizes explicit error
checking over exceptions. This guide covers the essential patterns...
[Content streams in real-time]
[Tokens: 45 prompt, 523 completion, 568 total]
=== Streaming with Progress ===
Distributed systems are computing environments where components...
[245 tokens, 3.2s elapsed]
...communicate over a network to achieve common goals.
[Complete: 312 total tokens]
=== Parallel Streaming ===
--- Topic 1 ---
Go channels are typed conduits through which you can send and receive values...
--- Topic 2 ---
Goroutines are lightweight threads managed by the Go runtime...
--- Topic 3 ---
A mutex (mutual exclusion lock) is a synchronization primitive...
PracticeRecommendation
Always handle errorsCheck both stream.Err and context errors
Use context for cancellationPass context to enable clean shutdown
Buffer for performanceUse strings.Builder for accumulation
Rate limit UI updatesBatch tokens for smooth rendering
Log partial resultsSave progress for retry/resume scenarios
Clean up resourcesEnsure stream channels are drained

OpenAI Provider

Provider streaming details. OpenAI →