Skip to content

LLM Nodes With Iris

PetalFlow integrates with Iris to power LLM operations. The irisadapter package bridges Iris providers into PetalFlow nodes, giving you access to all Iris features while keeping orchestration logic in your graphs.

Terminal window
go get github.com/petal-labs/petalflow/irisadapter

The adapter wraps any Iris provider for use in PetalFlow nodes:

import (
"github.com/petal-labs/iris/providers/openai"
"github.com/petal-labs/petalflow/irisadapter"
)
provider := openai.New(os.Getenv("OPENAI_API_KEY"))
client := irisadapter.NewProviderAdapter(provider)

Create an LLM node with a provider adapter:

llmNode := petalflow.NewLLMNode("chat", client, petalflow.LLMNodeConfig{
Model: "gpt-4o",
SystemPrompt: "You are a helpful assistant.",
PromptTemplate: "{{.Vars.question}}",
OutputKey: "answer",
})
FieldTypeDescription
ModelstringModel identifier (provider-specific)
SystemPromptstringSystem message for the conversation
PromptTemplatestringGo template for user prompt
OutputKeystringEnvelope key for storing response
Temperaturefloat64Sampling temperature (0.0-2.0)
MaxTokensintMaximum response tokens
TopPfloat64Nucleus sampling parameter
StopSequences[]stringSequences that stop generation
ResponseFormatResponseFormatJSON mode or structured output

Templates use Go’s text/template syntax with access to the envelope:

config := petalflow.LLMNodeConfig{
PromptTemplate: `Analyze the following customer message:
Customer: {{.Vars.customer_name}}
Message: {{.Vars.message}}
Previous interactions: {{.Vars.interaction_count}}
Provide a sentiment analysis and suggested response.`,
}
config := petalflow.LLMNodeConfig{
PromptTemplate: `{{if .Vars.context}}Context: {{.Vars.context}}
{{end}}Question: {{.Vars.question}}
{{if .Vars.format_instructions}}{{.Vars.format_instructions}}{{end}}`,
}
config := petalflow.LLMNodeConfig{
PromptTemplate: `Based on these search results:
{{range $i, $doc := .Vars.documents}}
[{{$i}}] {{$doc.title}}
{{$doc.content}}
{{end}}
Answer the question: {{.Vars.query}}`,
}
config := petalflow.LLMNodeConfig{
PromptTemplate: `Previous conversation:
{{range .Messages}}{{.Role}}: {{.Content}}
{{end}}
User: {{.Vars.user_input}}`,
}

Enable streaming for real-time output:

llmNode := petalflow.NewLLMNode("stream_chat", client, petalflow.LLMNodeConfig{
Model: "gpt-4o",
PromptTemplate: "{{.Vars.prompt}}",
OutputKey: "response",
Streaming: true,
OnToken: func(token string) {
fmt.Print(token) // Print tokens as they arrive
},
})
tokens := make(chan string, 100)
llmNode := petalflow.NewLLMNode("stream_chat", client, petalflow.LLMNodeConfig{
Model: "gpt-4o",
PromptTemplate: "{{.Vars.prompt}}",
OutputKey: "response",
Streaming: true,
OnToken: func(token string) {
tokens <- token
},
OnComplete: func() {
close(tokens)
},
})
// Consumer goroutine
go func() {
for token := range tokens {
// Process tokens (e.g., send to WebSocket client)
websocket.Send(token)
}
}()

Collect the full response while also streaming:

var fullResponse strings.Builder
llmNode := petalflow.NewLLMNode("stream_chat", client, petalflow.LLMNodeConfig{
Model: "gpt-4o",
PromptTemplate: "{{.Vars.prompt}}",
OutputKey: "response",
Streaming: true,
OnToken: func(token string) {
fullResponse.WriteString(token)
streamToClient(token)
},
})

Use different providers for different tasks in the same graph:

// Fast model for classification
classifyClient := irisadapter.NewProviderAdapter(
openai.New(os.Getenv("OPENAI_API_KEY")),
)
// Powerful model for generation
generateClient := irisadapter.NewProviderAdapter(
anthropic.New(os.Getenv("ANTHROPIC_API_KEY")),
)
// Local model for embeddings
embedClient := irisadapter.NewProviderAdapter(
ollama.New(ollama.WithBaseURL("http://localhost:11434")),
)
// Build graph with multiple providers
g := petalflow.NewGraph("multi-provider")
g.AddNode(petalflow.NewLLMNode("classify", classifyClient, petalflow.LLMNodeConfig{
Model: "gpt-4o-mini",
PromptTemplate: "Classify this text: {{.Vars.input}}",
OutputKey: "classification",
}))
g.AddNode(petalflow.NewLLMNode("generate", generateClient, petalflow.LLMNodeConfig{
Model: "claude-3-5-sonnet-20241022",
PromptTemplate: "Based on classification {{.Vars.classification}}, generate: ...",
OutputKey: "response",
}))
g.AddNode(petalflow.NewLLMNode("embed", embedClient, petalflow.LLMNodeConfig{
Model: "nomic-embed-text",
PromptTemplate: "{{.Vars.response}}",
OutputKey: "embedding",
}))

Request JSON-formatted responses:

llmNode := petalflow.NewLLMNode("extract", client, petalflow.LLMNodeConfig{
Model: "gpt-4o",
SystemPrompt: "Extract entities from the text. Respond in JSON format.",
PromptTemplate: "Text: {{.Vars.text}}",
OutputKey: "entities",
ResponseFormat: petalflow.ResponseFormatJSON,
})

For providers that support it, define an output schema:

llmNode := petalflow.NewLLMNode("extract", client, petalflow.LLMNodeConfig{
Model: "gpt-4o",
PromptTemplate: "Extract contact info from: {{.Vars.text}}",
OutputKey: "contact",
ResponseFormat: petalflow.ResponseFormatStructured,
OutputSchema: map[string]any{
"type": "object",
"properties": map[string]any{
"name": map[string]any{"type": "string"},
"email": map[string]any{"type": "string"},
"phone": map[string]any{"type": "string"},
},
"required": []string{"name"},
},
})

Enable Iris tools in LLM nodes:

// Register tools with Iris client
provider := openai.New(os.Getenv("OPENAI_API_KEY"))
provider.RegisterTool(iris.Tool{
Name: "search_database",
Description: "Search the product database",
Parameters: iris.Parameters{
Type: "object",
Properties: map[string]iris.Property{
"query": {Type: "string", Description: "Search query"},
},
Required: []string{"query"},
},
Handler: func(ctx context.Context, params map[string]any) (any, error) {
query := params["query"].(string)
return searchProducts(ctx, query), nil
},
})
client := irisadapter.NewProviderAdapter(provider)
// Create node with tool access
llmNode := petalflow.NewLLMNode("assistant", client, petalflow.LLMNodeConfig{
Model: "gpt-4o",
SystemPrompt: "You are a shopping assistant. Use search_database to find products.",
PromptTemplate: "{{.Vars.user_query}}",
OutputKey: "response",
Tools: []string{"search_database"},
ToolChoice: petalflow.ToolChoiceAuto,
})
OptionDescription
ToolChoiceAutoModel decides when to use tools
ToolChoiceRequiredModel must use at least one tool
ToolChoiceNoneDisable tool use for this call
ToolChoice("name")Force use of specific tool

Configure retries for transient failures:

client := irisadapter.NewProviderAdapter(provider, irisadapter.WithRetry(
irisadapter.RetryConfig{
MaxRetries: 3,
InitialWait: 1 * time.Second,
MaxWait: 30 * time.Second,
Multiplier: 2.0,
RetryOn: []int{429, 500, 502, 503, 504},
},
))

Set up fallback for high availability:

primaryClient := irisadapter.NewProviderAdapter(
openai.New(os.Getenv("OPENAI_API_KEY")),
)
fallbackClient := irisadapter.NewProviderAdapter(
anthropic.New(os.Getenv("ANTHROPIC_API_KEY")),
)
client := irisadapter.NewFallbackAdapter(primaryClient, fallbackClient)

Route errors to dedicated handlers:

g := petalflow.NewGraph("with-error-handling")
g.AddNode(petalflow.NewLLMNode("generate", client, generateConfig))
g.AddNode(petalflow.NewRuleRouter("error_check", petalflow.RuleRouterConfig{
Routes: []petalflow.RouteRule{
{When: petalflow.RouteCondition{Var: "llm_error", Op: petalflow.OpNotEmpty}, To: "error_handler"},
},
Default: "continue",
}))
g.AddNode(petalflow.NewTransformNode("error_handler", petalflow.TransformNodeConfig{
Transform: func(inputs map[string]any) (any, error) {
err := inputs["llm_error"].(error)
log.Printf("LLM error: %v", err)
return "I apologize, but I encountered an error. Please try again.", nil
},
OutputKey: "response",
}))

Combine Iris telemetry with PetalFlow events for full observability:

// Iris telemetry hook
provider := openai.New(
os.Getenv("OPENAI_API_KEY"),
openai.WithTelemetry(func(event iris.TelemetryEvent) {
log.Printf("[Iris] %s: model=%s tokens=%d latency=%v",
event.Type, event.Model, event.TotalTokens, event.Latency)
}),
)
client := irisadapter.NewProviderAdapter(provider)
// PetalFlow event handler
flowHandler := func(event petalflow.Event) {
if event.Kind == petalflow.EventNodeEnd {
log.Printf("[Flow] %s completed in %v", event.NodeID, event.Duration)
}
}
// Run with both
runtime := petalflow.NewRuntime()
runtime.Run(ctx, graph, env, petalflow.RunOptions{EventHandler: flowHandler})

Use trace IDs to correlate Iris and PetalFlow events:

// Generate trace ID
traceID := uuid.New().String()
// Pass to Iris
provider := openai.New(
os.Getenv("OPENAI_API_KEY"),
openai.WithRequestHeaders(map[string]string{
"X-Trace-ID": traceID,
}),
)
// Include in envelope
env := petalflow.NewEnvelope()
env.SetVar("trace_id", traceID)
// Log with trace ID
flowHandler := func(event petalflow.Event) {
traceID := event.Data["envelope"].(*petalflow.Envelope).GetVar("trace_id")
log.Printf("[%s] %s: %s", traceID, event.Kind, event.NodeID)
}