Petal Flow Nodes API
PetalFlow provides a library of built-in nodes for common workflow operations. Each node type handles a specific concern: LLM calls, routing, transformation, validation, or control flow.
Built-in Node Reference
Section titled “Built-in Node Reference”| Node | Category | Purpose |
|---|---|---|
LLMNode | Reasoning | Call an LLM with a prompt template |
ToolNode | Integration | Execute tool functions and external APIs |
RuleRouter | Routing | Route based on conditional rules |
LLMRouter | Routing | Route using LLM classification |
FilterNode | Processing | Filter lists by criteria |
TransformNode | Processing | Transform data in envelopes |
MergeNode | Control Flow | Merge parallel branches |
GuardianNode | Validation | Validate inputs and enforce constraints |
CacheNode | Optimization | Cache expensive operations |
HumanNode | Control Flow | Human-in-the-loop approvals |
LLMNode
Section titled “LLMNode”Calls a language model with a templated prompt. The response is stored in the envelope under the configured output key. PetalFlow v0.2.0 adds full support for multi-turn tool use workflows.
Configuration
Section titled “Configuration”| Field | Type | Description |
|---|---|---|
Model | string | Model identifier (e.g., gpt-4o, claude-3-opus) |
PromptTemplate | string | Go template for the prompt. Access envelope via {{.Vars.key}} |
SystemPrompt | string | Optional system message (Chat Completions API style) |
Instructions | string | Optional system instructions (Responses API style) |
OutputKey | string | Envelope key to store the response |
Temperature | float64 | Sampling temperature (0.0-2.0) |
MaxTokens | int | Maximum tokens in response |
StopSequences | []string | Sequences that stop generation |
Example
Section titled “Example”node := petalflow.NewLLMNode("summarize", client, petalflow.LLMNodeConfig{ Model: "gpt-4o", SystemPrompt: "You are a technical writer. Be concise and accurate.", PromptTemplate: `Summarize the following document in 3 bullet points:
{{.Vars.document}}`, OutputKey: "summary", Temperature: 0.3, MaxTokens: 500,})Template Variables
Section titled “Template Variables”Access envelope data in templates:
// Access variables"{{.Vars.query}}""{{.Vars.context}}"
// Access message history"{{range .Messages}}{{.Role}}: {{.Content}}\n{{end}}"
// Conditional content"{{if .Vars.include_examples}}Examples: {{.Vars.examples}}{{end}}"LLM Types (v0.2.0)
Section titled “LLM Types (v0.2.0)”PetalFlow v0.2.0 introduces new types for multi-turn tool use workflows:
LLMMessage
Section titled “LLMMessage”Messages in PetalFlow conversations:
type LLMMessage struct { Role string // "system", "user", "assistant", "tool" Content string // Message content Name string // Optional: tool name, agent role ToolCalls []LLMToolCall // For assistant messages with pending tool calls ToolResults []LLMToolResult // For tool result messages (Role="tool") Meta map[string]any // Optional metadata}LLMToolCall
Section titled “LLMToolCall”Represents a tool invocation requested by the model:
type LLMToolCall struct { ID string Name string Arguments map[string]any}LLMToolResult
Section titled “LLMToolResult”Represents the result of executing a tool:
type LLMToolResult struct { CallID string // Must match LLMToolCall.ID from the response Content any // Result data (will be JSON marshaled by the adapter) IsError bool // True if this represents an error result}LLMReasoningOutput
Section titled “LLMReasoningOutput”Contains reasoning information from models that support it (e.g., o1, o3):
type LLMReasoningOutput struct { ID string // Reasoning output identifier Summary []string // Reasoning summary points}LLMResponse
Section titled “LLMResponse”The response from an LLM call:
type LLMResponse struct { Text string // Raw text output JSON map[string]any // Parsed JSON if structured output requested Messages []LLMMessage // Conversation messages including response Usage LLMTokenUsage // Token consumption Provider string // Provider ID that handled the request Model string // Model that generated the response ToolCalls []LLMToolCall // Tool calls requested by the model Reasoning *LLMReasoningOutput // Reasoning output (optional) Status string // Response status (optional) Meta map[string]any // Additional response metadata}Multi-Turn Tool Use Example
Section titled “Multi-Turn Tool Use Example”// Build a workflow that handles tool callsg := petalflow.NewGraph("tool-workflow")
// LLM node that may request tool callsllmNode := petalflow.NewLLMNode("agent", client, petalflow.LLMNodeConfig{ Model: "gpt-4o", SystemPrompt: "You are a helpful assistant with access to tools.", OutputKey: "agent_response",})
// Tool node to execute requested toolstoolNode := petalflow.NewToolNode("executor", petalflow.ToolNodeConfig{ ToolName: "search", OutputKey: "tool_result",})
// Router to check if tool calls are neededrouter := petalflow.NewRuleRouter("check_tools", petalflow.RuleRouterConfig{ Routes: []petalflow.RouteRule{ { When: petalflow.RouteCondition{ Var: "agent_response.tool_calls", Op: petalflow.OpNotEmpty, }, To: "executor", }, }, Default: "output",})
g.AddNode(llmNode)g.AddNode(toolNode)g.AddNode(router)g.AddEdge("agent", "check_tools")g.AddEdge("executor", "agent") // Loop back with tool resultsToolNode
Section titled “ToolNode”Executes external tools, APIs, or functions. Tools are registered with the Iris client and called by name.
Configuration
Section titled “Configuration”| Field | Type | Description |
|---|---|---|
ToolName | string | Name of the registered tool to call |
InputMapping | map[string]string | Map envelope keys to tool parameters |
OutputKey | string | Envelope key to store the result |
Timeout | time.Duration | Maximum execution time |
RetryPolicy | RetryPolicy | Retry configuration for failures |
Example
Section titled “Example”// Register tool with Iris clientclient.RegisterTool(iris.Tool{ Name: "search_knowledge_base", Description: "Search the knowledge base for relevant documents", Parameters: iris.Parameters{ Type: "object", Properties: map[string]iris.Property{ "query": {Type: "string", Description: "Search query"}, "limit": {Type: "integer", Description: "Max results"}, }, Required: []string{"query"}, }, Handler: searchHandler,})
// Create node that calls the toolnode := petalflow.NewToolNode("search", petalflow.ToolNodeConfig{ ToolName: "search_knowledge_base", InputMapping: map[string]string{ "query": "user_query", // envelope key -> tool param "limit": "result_limit", }, OutputKey: "search_results", Timeout: 30 * time.Second,})RuleRouter
Section titled “RuleRouter”Routes execution to different nodes based on conditional rules. Rules are evaluated in order; the first match determines the route.
Configuration
Section titled “Configuration”| Field | Type | Description |
|---|---|---|
Routes | []RouteRule | Ordered list of routing rules |
Default | string | Fallback node if no rules match |
RouteRule Fields
Section titled “RouteRule Fields”| Field | Type | Description |
|---|---|---|
When | RouteCondition | Condition to evaluate |
To | string | Target node ID when condition matches |
Operators
Section titled “Operators”| Operator | Description |
|---|---|
OpEquals | Exact equality |
OpNotEquals | Not equal |
OpContains | String contains substring |
OpStartsWith | String starts with prefix |
OpEndsWith | String ends with suffix |
OpGt, OpGte | Greater than (or equal) |
OpLt, OpLte | Less than (or equal) |
OpIn | Value in list |
OpNotIn | Value not in list |
OpEmpty | Value is empty/nil |
OpNotEmpty | Value is not empty |
Example
Section titled “Example”router := petalflow.NewRuleRouter("priority_router", petalflow.RuleRouterConfig{ Routes: []petalflow.RouteRule{ // High priority tickets go to urgent handler { When: petalflow.RouteCondition{ Var: "priority", Op: petalflow.OpEquals, Value: "high", }, To: "urgent_handler", }, // VIP customers get dedicated support { When: petalflow.RouteCondition{ Var: "customer_tier", Op: petalflow.OpIn, Value: []string{"platinum", "gold"}, }, To: "vip_handler", }, // Large orders need review { When: petalflow.RouteCondition{ Var: "order_total", Op: petalflow.OpGte, Value: 10000, }, To: "review_handler", }, }, Default: "standard_handler",})LLMRouter
Section titled “LLMRouter”Routes execution based on LLM classification. The model analyzes input and selects from predefined categories.
Configuration
Section titled “Configuration”| Field | Type | Description |
|---|---|---|
Model | string | Model to use for classification |
Categories | []Category | Available routing categories |
PromptTemplate | string | Template for classification prompt |
InputKey | string | Envelope key containing input to classify |
Category Fields
Section titled “Category Fields”| Field | Type | Description |
|---|---|---|
Name | string | Category identifier (used as node ID target) |
Description | string | Description to help the LLM classify |
Example
Section titled “Example”router := petalflow.NewLLMRouter("intent_classifier", client, petalflow.LLMRouterConfig{ Model: "gpt-4o-mini", InputKey: "user_message", Categories: []petalflow.Category{ {Name: "billing", Description: "Questions about invoices, payments, refunds, or pricing"}, {Name: "technical", Description: "Technical issues, bugs, errors, or how-to questions"}, {Name: "sales", Description: "Product inquiries, upgrades, or new purchases"}, {Name: "general", Description: "General questions, feedback, or other topics"}, }, PromptTemplate: `Classify the following customer message into exactly one category.
Message: {{.Vars.user_message}}
Categories:{{range .Categories}}- {{.Name}}: {{.Description}}{{end}}
Respond with only the category name.`,})FilterNode
Section titled “FilterNode”Filters a list in the envelope based on criteria. Useful for narrowing search results, removing invalid entries, or selecting top items.
Configuration
Section titled “Configuration”| Field | Type | Description |
|---|---|---|
InputKey | string | Envelope key containing the list to filter |
OutputKey | string | Envelope key for filtered results |
Criteria | []FilterCriterion | Filter conditions (all must pass) |
Limit | int | Maximum items to return (0 = no limit) |
SortBy | string | Field to sort by before limiting |
SortDesc | bool | Sort descending (default ascending) |
FilterCriterion Fields
Section titled “FilterCriterion Fields”| Field | Type | Description |
|---|---|---|
Field | string | Field name to check on each item |
Op | FilterOp | Comparison operator |
Value | any | Value to compare against |
Example
Section titled “Example”// Filter search results to high-relevance, recent documentsfilter := petalflow.NewFilterNode("filter_results", petalflow.FilterNodeConfig{ InputKey: "search_results", OutputKey: "filtered_results", Criteria: []petalflow.FilterCriterion{ {Field: "relevance_score", Op: petalflow.FilterGte, Value: 0.7}, {Field: "published_date", Op: petalflow.FilterGte, Value: "2024-01-01"}, {Field: "status", Op: petalflow.FilterEquals, Value: "published"}, }, SortBy: "relevance_score", SortDesc: true, Limit: 5,})TransformNode
Section titled “TransformNode”Transforms data in the envelope using Go functions or templates. Use for parsing, formatting, restructuring, or computing derived values.
Configuration
Section titled “Configuration”| Field | Type | Description |
|---|---|---|
InputKeys | []string | Envelope keys to read |
OutputKey | string | Envelope key for result |
Transform | TransformFunc | Function that performs the transformation |
Template | string | Alternative: use a Go template for simple transforms |
Example: Function Transform
Section titled “Example: Function Transform”transform := petalflow.NewTransformNode("parse_json", petalflow.TransformNodeConfig{ InputKeys: []string{"raw_response"}, OutputKey: "parsed_data", Transform: func(inputs map[string]any) (any, error) { raw := inputs["raw_response"].(string)
var data map[string]any if err := json.Unmarshal([]byte(raw), &data); err != nil { return nil, fmt.Errorf("invalid JSON: %w", err) }
// Extract and restructure return map[string]any{ "title": data["name"], "content": data["body"], "tags": data["labels"], }, nil },})Example: Template Transform
Section titled “Example: Template Transform”transform := petalflow.NewTransformNode("format_response", petalflow.TransformNodeConfig{ InputKeys: []string{"summary", "sources"}, OutputKey: "formatted_response", Template: `## Summary{{.Vars.summary}}
## Sources{{range .Vars.sources}}- {{.title}}: {{.url}}{{end}}`,})MergeNode
Section titled “MergeNode”Waits for multiple parallel branches to complete, then merges their results into a single envelope value.
Configuration
Section titled “Configuration”| Field | Type | Description |
|---|---|---|
InputKeys | []string | Envelope keys to merge (one per branch) |
OutputKey | string | Envelope key for merged result |
MergeStyle | MergeStyle | How to combine values |
MergeFunc | MergeFunc | Custom merge function (optional) |
Merge Styles
Section titled “Merge Styles”| Style | Description |
|---|---|
MergeStyleArray | Collect all values into an array |
MergeStyleObject | Combine into object with input keys as fields |
MergeStyleConcat | Concatenate string values |
MergeStyleFirst | Use first non-nil value |
MergeStyleCustom | Use provided MergeFunc |
Example
Section titled “Example”// Merge parallel analysis results into a single objectmerge := petalflow.NewMergeNode("combine_analysis", petalflow.MergeNodeConfig{ InputKeys: []string{"sentiment_result", "entity_result", "summary_result"}, OutputKey: "full_analysis", MergeStyle: petalflow.MergeStyleObject,})
// Result structure:// {// "sentiment_result": {...},// "entity_result": {...},// "summary_result": {...}// }Custom Merge Function
Section titled “Custom Merge Function”merge := petalflow.NewMergeNode("weighted_merge", petalflow.MergeNodeConfig{ InputKeys: []string{"score_a", "score_b", "score_c"}, OutputKey: "final_score", MergeStyle: petalflow.MergeStyleCustom, MergeFunc: func(values map[string]any) (any, error) { a := values["score_a"].(float64) b := values["score_b"].(float64) c := values["score_c"].(float64) // Weighted average return (a*0.5 + b*0.3 + c*0.2), nil },})GuardianNode
Section titled “GuardianNode”Validates envelope data against constraints. Guards can reject invalid data, log warnings, or trigger alternative paths.
Configuration
Section titled “Configuration”| Field | Type | Description |
|---|---|---|
Checks | []GuardCheck | Validation rules to apply |
OnFail | GuardAction | Action when validation fails |
FailRoute | string | Node to route to on failure (if action is Route) |
ErrorKey | string | Envelope key to store error details |
Guard Actions
Section titled “Guard Actions”| Action | Description |
|---|---|
GuardActionReject | Stop execution with error |
GuardActionWarn | Log warning and continue |
GuardActionRoute | Route to failure handling node |
GuardActionSkip | Skip remaining nodes in current branch |
Example
Section titled “Example”guard := petalflow.NewGuardianNode("input_validator", petalflow.GuardianNodeConfig{ Checks: []petalflow.GuardCheck{ // Required field check { Var: "query", Op: petalflow.OpNotEmpty, Message: "Query is required", }, // Length constraint { Var: "query", Op: petalflow.OpMaxLength, Value: 10000, Message: "Query exceeds maximum length of 10000 characters", }, // Format validation { Var: "email", Op: petalflow.OpMatches, Value: `^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$`, Message: "Invalid email format", }, // Numeric range { Var: "quantity", Op: petalflow.OpBetween, Value: []int{1, 100}, Message: "Quantity must be between 1 and 100", }, }, OnFail: petalflow.GuardActionRoute, FailRoute: "validation_error_handler", ErrorKey: "validation_errors",})CacheNode
Section titled “CacheNode”Caches the results of expensive operations. Subsequent executions with the same cache key return cached values without re-execution.
Configuration
Section titled “Configuration”| Field | Type | Description |
|---|---|---|
CacheKey | string | Template for generating cache keys |
Store | CacheStore | Cache backend (memory, Redis, etc.) |
TTL | time.Duration | Time-to-live for cached entries |
InputKeys | []string | Envelope keys to cache |
Enabled | bool | Enable/disable caching |
Example
Section titled “Example”// Cache expensive LLM embeddingscache := petalflow.NewCacheNode("embedding_cache", petalflow.CacheNodeConfig{ CacheKey: "embedding:{{.Vars.document_id}}:{{.Vars.model}}", Store: redisStore, TTL: 24 * time.Hour, InputKeys: []string{"embedding_vector"}, Enabled: true,})
// Use with an LLM nodeg.AddNode(petalflow.NewLLMNode("generate_embedding", client, embeddingConfig))g.AddNode(cache)g.AddEdge("generate_embedding", "embedding_cache")Cache Store Interface
Section titled “Cache Store Interface”type CacheStore interface { Get(ctx context.Context, key string) (any, bool, error) Set(ctx context.Context, key string, value any, ttl time.Duration) error Delete(ctx context.Context, key string) error}
// Built-in storesstore := petalflow.NewMemoryCacheStore() // In-memory (single instance)store := petalflow.NewRedisCacheStore(redisClient) // Redis (distributed)HumanNode
Section titled “HumanNode”Pauses execution for human review and approval. Integrates with external approval systems via callbacks or webhooks.
Configuration
Section titled “Configuration”| Field | Type | Description |
|---|---|---|
ReviewKey | string | Envelope key containing data for review |
ApprovalKey | string | Envelope key to store approval decision |
Callback | HumanCallback | Function called when review is needed |
Timeout | time.Duration | Maximum wait time for approval |
OnTimeout | TimeoutAction | Action if approval times out |
Example
Section titled “Example”human := petalflow.NewHumanNode("manager_approval", petalflow.HumanNodeConfig{ ReviewKey: "refund_request", ApprovalKey: "approval_decision", Timeout: 48 * time.Hour, OnTimeout: petalflow.TimeoutActionReject, Callback: func(ctx context.Context, env *petalflow.Envelope) (petalflow.ApprovalResult, error) { request := env.GetVar("refund_request")
// Send to external approval system ticketID, err := approvalSystem.CreateTicket(ctx, ApprovalTicket{ Type: "refund", Data: request, Urgency: "normal", }) if err != nil { return petalflow.ApprovalResult{}, err }
// Wait for decision (approval system calls back) decision, err := approvalSystem.WaitForDecision(ctx, ticketID) if err != nil { return petalflow.ApprovalResult{}, err }
return petalflow.ApprovalResult{ Approved: decision.Approved, Reviewer: decision.ReviewerID, Notes: decision.Comments, }, nil },})Using with Step Controllers
Section titled “Using with Step Controllers”For interactive approval in development or testing:
controller := petalflow.NewBreakpointStepController([]string{"manager_approval"})opts := petalflow.NewStepRunOptions(controller)
// Graph pauses at HumanNode, allowing inspection and manual approvalresult, err := runtime.Run(ctx, graph, env, opts)Custom Nodes
Section titled “Custom Nodes”Create custom nodes by implementing the Node interface:
type Node interface { ID() string Execute(ctx context.Context, env *Envelope) error}
// Example custom nodetype CustomAggregator struct { id string threshold float64}
func NewCustomAggregator(id string, threshold float64) *CustomAggregator { return &CustomAggregator{id: id, threshold: threshold}}
func (n *CustomAggregator) ID() string { return n.id}
func (n *CustomAggregator) Execute(ctx context.Context, env *petalflow.Envelope) error { scores := env.GetVar("scores").([]float64)
var sum float64 for _, s := range scores { sum += s } avg := sum / float64(len(scores))
env.SetVar("average_score", avg) env.SetVar("above_threshold", avg >= n.threshold)
return nil}Register and use in graphs:
g.AddNode(NewCustomAggregator("score_aggregator", 0.75))g.AddEdge("collect_scores", "score_aggregator")