LLM Nodes With Iris
LLM Nodes With Iris
Section titled “LLM Nodes With Iris”PetalFlow integrates with Iris to power LLM operations. The irisadapter package bridges Iris
providers into PetalFlow nodes, giving you access to all Iris features while keeping
orchestration logic in your graphs.
Provider Setup
Section titled “Provider Setup”Installing the Adapter
Section titled “Installing the Adapter”go get github.com/petal-labs/petalflow/irisadapterCreating a Provider Adapter
Section titled “Creating a Provider Adapter”The adapter wraps any Iris provider for use in PetalFlow nodes:
import ( "github.com/petal-labs/iris/providers/openai" "github.com/petal-labs/petalflow/irisadapter")
provider := openai.New(os.Getenv("OPENAI_API_KEY"))client := irisadapter.NewProviderAdapter(provider)import ( "github.com/petal-labs/iris/providers/anthropic" "github.com/petal-labs/petalflow/irisadapter")
provider := anthropic.New(os.Getenv("ANTHROPIC_API_KEY"))client := irisadapter.NewProviderAdapter(provider)import ( "github.com/petal-labs/iris/providers/ollama" "github.com/petal-labs/petalflow/irisadapter")
provider := ollama.New(ollama.WithBaseURL("http://localhost:11434"))client := irisadapter.NewProviderAdapter(provider)Basic LLM Node
Section titled “Basic LLM Node”Create an LLM node with a provider adapter:
llmNode := petalflow.NewLLMNode("chat", client, petalflow.LLMNodeConfig{ Model: "gpt-4o", SystemPrompt: "You are a helpful assistant.", PromptTemplate: "{{.Vars.question}}", OutputKey: "answer",})Configuration Options
Section titled “Configuration Options”| Field | Type | Description |
|---|---|---|
Model | string | Model identifier (provider-specific) |
SystemPrompt | string | System message for the conversation |
PromptTemplate | string | Go template for user prompt |
OutputKey | string | Envelope key for storing response |
Temperature | float64 | Sampling temperature (0.0-2.0) |
MaxTokens | int | Maximum response tokens |
TopP | float64 | Nucleus sampling parameter |
StopSequences | []string | Sequences that stop generation |
ResponseFormat | ResponseFormat | JSON mode or structured output |
Prompt Templates
Section titled “Prompt Templates”Templates use Go’s text/template syntax with access to the envelope:
Accessing Variables
Section titled “Accessing Variables”config := petalflow.LLMNodeConfig{ PromptTemplate: `Analyze the following customer message:
Customer: {{.Vars.customer_name}}Message: {{.Vars.message}}Previous interactions: {{.Vars.interaction_count}}
Provide a sentiment analysis and suggested response.`,}Conditional Content
Section titled “Conditional Content”config := petalflow.LLMNodeConfig{ PromptTemplate: `{{if .Vars.context}}Context: {{.Vars.context}}
{{end}}Question: {{.Vars.question}}
{{if .Vars.format_instructions}}{{.Vars.format_instructions}}{{end}}`,}Iterating Over Lists
Section titled “Iterating Over Lists”config := petalflow.LLMNodeConfig{ PromptTemplate: `Based on these search results:
{{range $i, $doc := .Vars.documents}}[{{$i}}] {{$doc.title}}{{$doc.content}}
{{end}}
Answer the question: {{.Vars.query}}`,}Including Message History
Section titled “Including Message History”config := petalflow.LLMNodeConfig{ PromptTemplate: `Previous conversation:{{range .Messages}}{{.Role}}: {{.Content}}{{end}}
User: {{.Vars.user_input}}`,}Streaming Responses
Section titled “Streaming Responses”Enable streaming for real-time output:
llmNode := petalflow.NewLLMNode("stream_chat", client, petalflow.LLMNodeConfig{ Model: "gpt-4o", PromptTemplate: "{{.Vars.prompt}}", OutputKey: "response", Streaming: true, OnToken: func(token string) { fmt.Print(token) // Print tokens as they arrive },})Stream to Channel
Section titled “Stream to Channel”tokens := make(chan string, 100)
llmNode := petalflow.NewLLMNode("stream_chat", client, petalflow.LLMNodeConfig{ Model: "gpt-4o", PromptTemplate: "{{.Vars.prompt}}", OutputKey: "response", Streaming: true, OnToken: func(token string) { tokens <- token }, OnComplete: func() { close(tokens) },})
// Consumer goroutinego func() { for token := range tokens { // Process tokens (e.g., send to WebSocket client) websocket.Send(token) }}()Stream with Aggregation
Section titled “Stream with Aggregation”Collect the full response while also streaming:
var fullResponse strings.Builder
llmNode := petalflow.NewLLMNode("stream_chat", client, petalflow.LLMNodeConfig{ Model: "gpt-4o", PromptTemplate: "{{.Vars.prompt}}", OutputKey: "response", Streaming: true, OnToken: func(token string) { fullResponse.WriteString(token) streamToClient(token) },})Multi-Provider Setup
Section titled “Multi-Provider Setup”Use different providers for different tasks in the same graph:
// Fast model for classificationclassifyClient := irisadapter.NewProviderAdapter( openai.New(os.Getenv("OPENAI_API_KEY")),)
// Powerful model for generationgenerateClient := irisadapter.NewProviderAdapter( anthropic.New(os.Getenv("ANTHROPIC_API_KEY")),)
// Local model for embeddingsembedClient := irisadapter.NewProviderAdapter( ollama.New(ollama.WithBaseURL("http://localhost:11434")),)
// Build graph with multiple providersg := petalflow.NewGraph("multi-provider")
g.AddNode(petalflow.NewLLMNode("classify", classifyClient, petalflow.LLMNodeConfig{ Model: "gpt-4o-mini", PromptTemplate: "Classify this text: {{.Vars.input}}", OutputKey: "classification",}))
g.AddNode(petalflow.NewLLMNode("generate", generateClient, petalflow.LLMNodeConfig{ Model: "claude-3-5-sonnet-20241022", PromptTemplate: "Based on classification {{.Vars.classification}}, generate: ...", OutputKey: "response",}))
g.AddNode(petalflow.NewLLMNode("embed", embedClient, petalflow.LLMNodeConfig{ Model: "nomic-embed-text", PromptTemplate: "{{.Vars.response}}", OutputKey: "embedding",}))Structured Output
Section titled “Structured Output”JSON Mode
Section titled “JSON Mode”Request JSON-formatted responses:
llmNode := petalflow.NewLLMNode("extract", client, petalflow.LLMNodeConfig{ Model: "gpt-4o", SystemPrompt: "Extract entities from the text. Respond in JSON format.", PromptTemplate: "Text: {{.Vars.text}}", OutputKey: "entities", ResponseFormat: petalflow.ResponseFormatJSON,})Structured Output with Schema
Section titled “Structured Output with Schema”For providers that support it, define an output schema:
llmNode := petalflow.NewLLMNode("extract", client, petalflow.LLMNodeConfig{ Model: "gpt-4o", PromptTemplate: "Extract contact info from: {{.Vars.text}}", OutputKey: "contact", ResponseFormat: petalflow.ResponseFormatStructured, OutputSchema: map[string]any{ "type": "object", "properties": map[string]any{ "name": map[string]any{"type": "string"}, "email": map[string]any{"type": "string"}, "phone": map[string]any{"type": "string"}, }, "required": []string{"name"}, },})Tool Calling
Section titled “Tool Calling”Enable Iris tools in LLM nodes:
// Register tools with Iris clientprovider := openai.New(os.Getenv("OPENAI_API_KEY"))provider.RegisterTool(iris.Tool{ Name: "search_database", Description: "Search the product database", Parameters: iris.Parameters{ Type: "object", Properties: map[string]iris.Property{ "query": {Type: "string", Description: "Search query"}, }, Required: []string{"query"}, }, Handler: func(ctx context.Context, params map[string]any) (any, error) { query := params["query"].(string) return searchProducts(ctx, query), nil },})
client := irisadapter.NewProviderAdapter(provider)
// Create node with tool accessllmNode := petalflow.NewLLMNode("assistant", client, petalflow.LLMNodeConfig{ Model: "gpt-4o", SystemPrompt: "You are a shopping assistant. Use search_database to find products.", PromptTemplate: "{{.Vars.user_query}}", OutputKey: "response", Tools: []string{"search_database"}, ToolChoice: petalflow.ToolChoiceAuto,})Tool Choice Options
Section titled “Tool Choice Options”| Option | Description |
|---|---|
ToolChoiceAuto | Model decides when to use tools |
ToolChoiceRequired | Model must use at least one tool |
ToolChoiceNone | Disable tool use for this call |
ToolChoice("name") | Force use of specific tool |
Error Handling
Section titled “Error Handling”Retry Configuration
Section titled “Retry Configuration”Configure retries for transient failures:
client := irisadapter.NewProviderAdapter(provider, irisadapter.WithRetry( irisadapter.RetryConfig{ MaxRetries: 3, InitialWait: 1 * time.Second, MaxWait: 30 * time.Second, Multiplier: 2.0, RetryOn: []int{429, 500, 502, 503, 504}, },))Fallback Providers
Section titled “Fallback Providers”Set up fallback for high availability:
primaryClient := irisadapter.NewProviderAdapter( openai.New(os.Getenv("OPENAI_API_KEY")),)
fallbackClient := irisadapter.NewProviderAdapter( anthropic.New(os.Getenv("ANTHROPIC_API_KEY")),)
client := irisadapter.NewFallbackAdapter(primaryClient, fallbackClient)Error Handling in Graphs
Section titled “Error Handling in Graphs”Route errors to dedicated handlers:
g := petalflow.NewGraph("with-error-handling")
g.AddNode(petalflow.NewLLMNode("generate", client, generateConfig))
g.AddNode(petalflow.NewRuleRouter("error_check", petalflow.RuleRouterConfig{ Routes: []petalflow.RouteRule{ {When: petalflow.RouteCondition{Var: "llm_error", Op: petalflow.OpNotEmpty}, To: "error_handler"}, }, Default: "continue",}))
g.AddNode(petalflow.NewTransformNode("error_handler", petalflow.TransformNodeConfig{ Transform: func(inputs map[string]any) (any, error) { err := inputs["llm_error"].(error) log.Printf("LLM error: %v", err) return "I apologize, but I encountered an error. Please try again.", nil }, OutputKey: "response",}))Telemetry Integration
Section titled “Telemetry Integration”Combine Iris telemetry with PetalFlow events for full observability:
// Iris telemetry hookprovider := openai.New( os.Getenv("OPENAI_API_KEY"), openai.WithTelemetry(func(event iris.TelemetryEvent) { log.Printf("[Iris] %s: model=%s tokens=%d latency=%v", event.Type, event.Model, event.TotalTokens, event.Latency) }),)
client := irisadapter.NewProviderAdapter(provider)
// PetalFlow event handlerflowHandler := func(event petalflow.Event) { if event.Kind == petalflow.EventNodeEnd { log.Printf("[Flow] %s completed in %v", event.NodeID, event.Duration) }}
// Run with bothruntime := petalflow.NewRuntime()runtime.Run(ctx, graph, env, petalflow.RunOptions{EventHandler: flowHandler})Correlating Events
Section titled “Correlating Events”Use trace IDs to correlate Iris and PetalFlow events:
// Generate trace IDtraceID := uuid.New().String()
// Pass to Irisprovider := openai.New( os.Getenv("OPENAI_API_KEY"), openai.WithRequestHeaders(map[string]string{ "X-Trace-ID": traceID, }),)
// Include in envelopeenv := petalflow.NewEnvelope()env.SetVar("trace_id", traceID)
// Log with trace IDflowHandler := func(event petalflow.Event) { traceID := event.Data["envelope"].(*petalflow.Envelope).GetVar("trace_id") log.Printf("[%s] %s: %s", traceID, event.Kind, event.NodeID)}