Tools Guide
Combine streaming with tool calling. Tools →
Streaming delivers model output incrementally as it’s generated, enabling responsive UIs and real-time applications. Iris provides a streaming-first API with well-defined channels for content, errors, and final responses.
The ChatStream type provides three channels for stream processing:
type ChatStream struct { Ch <-chan StreamChunk // Incremental content chunks Err <-chan error // Error channel (at most one error) Final <-chan *ChatResponse // Aggregated response after completion}
type StreamChunk struct { Content string // Text content delta ToolCalls []ToolCall // Tool call deltas Usage *Usage // Token usage (final chunk only) FinishReason string // Why generation stopped}package main
import ( "context" "fmt" "os"
"github.com/petal-labs/iris/core" "github.com/petal-labs/iris/providers/openai")
func main() { provider := openai.New(os.Getenv("OPENAI_API_KEY")) client := core.NewClient(provider)
stream, err := client.Chat("gpt-4o"). System("You are a helpful assistant."). User("Write a short story about a robot learning to paint."). GetStream(context.Background())
if err != nil { panic(err) }
// Process chunks as they arrive for chunk := range stream.Ch { fmt.Print(chunk.Content) // Print without newline for streaming effect } fmt.Println() // Final newline
// Always check for errors after stream completes if err := <-stream.Err; err != nil { panic(err) }
// Get the final aggregated response final := <-stream.Final fmt.Printf("\nTotal tokens: %d\n", final.Usage.TotalTokens)}When you only need the final response and don’t need real-time processing:
stream, err := client.Chat("gpt-4o"). User("Summarize this document"). GetStream(ctx)
if err != nil { return nil, err}
// DrainStream consumes all chunks and returns the final responseresp, err := core.DrainStream(ctx, stream)if err != nil { return nil, err}
fmt.Println(resp.Output)Build up the full response while still processing incrementally:
var fullContent strings.Builder
for chunk := range stream.Ch { fullContent.WriteString(chunk.Content)
// Update UI with each chunk updateUI(chunk.Content)}
// Use accumulated contentprocessComplete(fullContent.String())Track streaming progress for UI indicators:
type StreamProgress struct { ChunksReceived int TotalContent int LastChunk time.Time}
progress := &StreamProgress{}
for chunk := range stream.Ch { progress.ChunksReceived++ progress.TotalContent += len(chunk.Content) progress.LastChunk = time.Now()
// Update progress indicator fmt.Printf("\rReceived %d chunks, %d characters...", progress.ChunksReceived, progress.TotalContent)
// Process content processChunk(chunk.Content)}Process chunks in batches for efficiency:
const batchSize = 10buffer := make([]StreamChunk, 0, batchSize)
for chunk := range stream.Ch { buffer = append(buffer, chunk)
if len(buffer) >= batchSize { processBatch(buffer) buffer = buffer[:0] // Reset buffer }}
// Process remaining chunksif len(buffer) > 0 { processBatch(buffer)}stream, err := client.Chat("gpt-4o").User(prompt).GetStream(ctx)if err != nil { // Handle initial connection error return fmt.Errorf("failed to start stream: %w", err)}
for chunk := range stream.Ch { fmt.Print(chunk.Content)}
// Check for streaming errorsif err := <-stream.Err; err != nil { var rateLimitErr *core.RateLimitError var serverErr *core.ServerError
switch { case errors.As(err, &rateLimitErr): log.Printf("Rate limited during stream, retry after: %v", rateLimitErr.RetryAfter) case errors.As(err, &serverErr): log.Printf("Server error during stream: %s", serverErr.Message) default: log.Printf("Stream error: %v", err) }}Fall back to non-streaming on error:
func ChatWithFallback(ctx context.Context, client *core.Client, prompt string) (string, error) { // Try streaming first stream, err := client.Chat("gpt-4o").User(prompt).GetStream(ctx) if err == nil { var content strings.Builder for chunk := range stream.Ch { content.WriteString(chunk.Content) }
if streamErr := <-stream.Err; streamErr == nil { return content.String(), nil } // Stream failed mid-way, fall through to retry }
// Fall back to non-streaming log.Println("Streaming failed, falling back to standard request") resp, err := client.Chat("gpt-4o").User(prompt).GetResponse(ctx) if err != nil { return "", err } return resp.Output, nil}Handle partial responses when stream fails:
func StreamWithRecovery(ctx context.Context, client *core.Client, prompt string) (string, error) { var accumulated strings.Builder
stream, err := client.Chat("gpt-4o").User(prompt).GetStream(ctx) if err != nil { return "", err }
for chunk := range stream.Ch { accumulated.WriteString(chunk.Content) }
if err := <-stream.Err; err != nil { // Return partial content with error if accumulated.Len() > 0 { return accumulated.String(), fmt.Errorf("partial response (%d chars): %w", accumulated.Len(), err) } return "", err }
return accumulated.String(), nil}Cancel streaming with context:
// Create cancellable contextctx, cancel := context.WithCancel(context.Background())defer cancel()
stream, err := client.Chat("gpt-4o"). User("Write a very long essay..."). GetStream(ctx)
if err != nil { return err}
// Process in goroutinego func() { for chunk := range stream.Ch { fmt.Print(chunk.Content) }}()
// Cancel after 5 secondstime.Sleep(5 * time.Second)cancel()
// Check for cancellationif err := <-stream.Err; err != nil { if errors.Is(err, context.Canceled) { fmt.Println("\nStream cancelled by user") }}// Cancel if stream takes too longctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)defer cancel()
stream, err := client.Chat("gpt-4o").User(prompt).GetStream(ctx)if err != nil { return err}
for chunk := range stream.Ch { fmt.Print(chunk.Content)}
if err := <-stream.Err; err != nil { if errors.Is(err, context.DeadlineExceeded) { fmt.Println("\nStream timed out") }}func StreamWithUserCancel(ctx context.Context, client *core.Client, prompt string, stopCh <-chan struct{}) error { ctx, cancel := context.WithCancel(ctx) defer cancel()
stream, err := client.Chat("gpt-4o").User(prompt).GetStream(ctx) if err != nil { return err }
for { select { case chunk, ok := <-stream.Ch: if !ok { // Stream completed return <-stream.Err } fmt.Print(chunk.Content)
case <-stopCh: // User requested stop cancel() return nil } }}Tool calls can be streamed incrementally:
stream, err := client.Chat("gpt-4o"). System("You are a helpful assistant with access to tools."). User("What's the weather in Tokyo and New York?"). Tools(weatherTool). GetStream(ctx)
if err != nil { return err}
var toolCallAccumulator = make(map[int]*core.ToolCall)
for chunk := range stream.Ch { // Handle content if chunk.Content != "" { fmt.Print(chunk.Content) }
// Handle tool call deltas for _, tc := range chunk.ToolCalls { if existing, ok := toolCallAccumulator[tc.Index]; ok { // Append to existing tool call existing.Function.Arguments += tc.Function.Arguments } else { // New tool call toolCallAccumulator[tc.Index] = &core.ToolCall{ ID: tc.ID, Type: tc.Type, Function: tc.Function, } } }}
// Process complete tool callsfor _, tc := range toolCallAccumulator { fmt.Printf("\nTool call: %s(%s)\n", tc.Function.Name, tc.Function.Arguments)}Integrate streaming with HTTP SSE for web applications.
package main
import ( "context" "encoding/json" "fmt" "net/http" "os"
"github.com/petal-labs/iris/core" "github.com/petal-labs/iris/providers/openai")
func sseHandler(w http.ResponseWriter, r *http.Request) { // Set SSE headers w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "*")
flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming not supported", http.StatusInternalServerError) return }
// Get prompt from query prompt := r.URL.Query().Get("prompt") if prompt == "" { prompt = "Hello, how can I help you?" }
// Create client provider := openai.New(os.Getenv("OPENAI_API_KEY")) client := core.NewClient(provider)
// Start stream with request context (cancels on client disconnect) stream, err := client.Chat("gpt-4o"). User(prompt). GetStream(r.Context())
if err != nil { fmt.Fprintf(w, "event: error\ndata: %s\n\n", err.Error()) flusher.Flush() return }
// Stream chunks as SSE events for chunk := range stream.Ch { data, _ := json.Marshal(map[string]string{ "content": chunk.Content, }) fmt.Fprintf(w, "event: message\ndata: %s\n\n", data) flusher.Flush() }
// Check for errors if err := <-stream.Err; err != nil { fmt.Fprintf(w, "event: error\ndata: %s\n\n", err.Error()) flusher.Flush() return }
// Send completion event final := <-stream.Final data, _ := json.Marshal(map[string]any{ "done": true, "usage": final.Usage, }) fmt.Fprintf(w, "event: done\ndata: %s\n\n", data) flusher.Flush()}
func main() { http.HandleFunc("/chat", sseHandler) http.ListenAndServe(":8080", nil)}const eventSource = new EventSource('/chat?prompt=Hello');
eventSource.addEventListener('message', (event) => { const data = JSON.parse(event.data); document.getElementById('output').textContent += data.content;});
eventSource.addEventListener('done', (event) => { const data = JSON.parse(event.data); console.log('Stream complete, tokens used:', data.usage.total_tokens); eventSource.close();});
eventSource.addEventListener('error', (event) => { console.error('Stream error:', event); eventSource.close();});For bidirectional communication:
package main
import ( "context" "encoding/json" "log" "net/http" "os"
"github.com/gorilla/websocket" "github.com/petal-labs/iris/core" "github.com/petal-labs/iris/providers/openai")
var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true },}
type Message struct { Type string `json:"type"` Content string `json:"content"`}
func wsHandler(w http.ResponseWriter, r *http.Request) { conn, err := upgrader.Upgrade(w, r, nil) if err != nil { return } defer conn.Close()
provider := openai.New(os.Getenv("OPENAI_API_KEY")) client := core.NewClient(provider)
for { // Read user message _, msgData, err := conn.ReadMessage() if err != nil { break }
var msg Message json.Unmarshal(msgData, &msg)
// Stream response stream, err := client.Chat("gpt-4o"). User(msg.Content). GetStream(context.Background())
if err != nil { conn.WriteJSON(Message{Type: "error", Content: err.Error()}) continue }
// Send chunks for chunk := range stream.Ch { conn.WriteJSON(Message{Type: "chunk", Content: chunk.Content}) }
// Check for errors if streamErr := <-stream.Err; streamErr != nil { conn.WriteJSON(Message{Type: "error", Content: streamErr.Error()}) }
// Send completion conn.WriteJSON(Message{Type: "done", Content: ""}) }}func MultiStream(ctx context.Context, client *core.Client, prompts []string) ([]string, error) { results := make([]string, len(prompts)) errors := make([]error, len(prompts)) var wg sync.WaitGroup
for i, prompt := range prompts { wg.Add(1) go func(idx int, p string) { defer wg.Done()
stream, err := client.Chat("gpt-4o").User(p).GetStream(ctx) if err != nil { errors[idx] = err return }
var content strings.Builder for chunk := range stream.Ch { content.WriteString(chunk.Content) }
if err := <-stream.Err; err != nil { errors[idx] = err return }
results[idx] = content.String() }(i, prompt) }
wg.Wait()
// Check for any errors for _, err := range errors { if err != nil { return results, fmt.Errorf("one or more streams failed: %w", err) } }
return results, nil}Broadcast stream chunks to multiple consumers:
type StreamFanOut struct { subscribers []chan StreamChunk mu sync.RWMutex}
func NewStreamFanOut() *StreamFanOut { return &StreamFanOut{ subscribers: make([]chan StreamChunk, 0), }}
func (f *StreamFanOut) Subscribe() <-chan StreamChunk { ch := make(chan StreamChunk, 100) f.mu.Lock() f.subscribers = append(f.subscribers, ch) f.mu.Unlock() return ch}
func (f *StreamFanOut) Broadcast(chunk StreamChunk) { f.mu.RLock() defer f.mu.RUnlock()
for _, ch := range f.subscribers { select { case ch <- chunk: default: // Subscriber too slow, skip } }}
func (f *StreamFanOut) Close() { f.mu.Lock() defer f.mu.Unlock()
for _, ch := range f.subscribers { close(ch) }}
// UsagefanOut := NewStreamFanOut()
// Add subscribersuiCh := fanOut.Subscribe()logCh := fanOut.Subscribe()
// Process streamgo func() { for chunk := range uiCh { updateUI(chunk.Content) }}()
go func() { for chunk := range logCh { logChunk(chunk) }}()
// Broadcast chunksstream, _ := client.Chat("gpt-4o").User(prompt).GetStream(ctx)for chunk := range stream.Ch { fanOut.Broadcast(chunk)}fanOut.Close()// Bad - goroutine leakfor chunk := range stream.Ch { fmt.Print(chunk.Content)}// Missing: <-stream.Err and <-stream.Final
// Goodfor chunk := range stream.Ch { fmt.Print(chunk.Content)}if err := <-stream.Err; err != nil { log.Printf("Error: %v", err)}<-stream.Final // Even if you don't need it// Always use context for timeout/cancellation controlctx, cancel := context.WithTimeout(ctx, 60*time.Second)defer cancel()
stream, err := client.Chat(model).User(prompt).GetStream(ctx)// Builders are not thread-safebuilder := client.Chat("gpt-4o").System("You are helpful.")
// Use Clone() for concurrent streamsgo func() { stream, _ := builder.Clone().User("Question 1").GetStream(ctx) // process...}()
go func() { stream, _ := builder.Clone().User("Question 2").GetStream(ctx) // process...}()// Use buffered processing if consumer is slowconst bufferSize = 100buffer := make(chan string, bufferSize)
go func() { for chunk := range stream.Ch { select { case buffer <- chunk.Content: default: log.Println("Warning: buffer full, dropping chunk") } } close(buffer)}()
// Slow consumerfor content := range buffer { slowProcess(content)}start := time.Now()var chunkCount intvar totalBytes int
for chunk := range stream.Ch { chunkCount++ totalBytes += len(chunk.Content) fmt.Print(chunk.Content)}
duration := time.Since(start)log.Printf("Stream completed: %d chunks, %d bytes, %v duration", chunkCount, totalBytes, duration)Tools Guide
Combine streaming with tool calling. Tools →
Telemetry Guide
Monitor streaming performance. Telemetry →
Providers
Provider-specific streaming features. Providers →
Examples
See streaming examples in action. Examples →