Skip to content

Petal Flow Nodes API

PetalFlow provides a library of built-in nodes for common workflow operations. Each node type handles a specific concern: LLM calls, routing, transformation, validation, or control flow.

NodeCategoryPurpose
LLMNodeReasoningCall an LLM with a prompt template
ToolNodeIntegrationExecute tool functions and external APIs
RuleRouterRoutingRoute based on conditional rules
LLMRouterRoutingRoute using LLM classification
FilterNodeProcessingFilter lists by criteria
TransformNodeProcessingTransform data in envelopes
MergeNodeControl FlowMerge parallel branches
GuardianNodeValidationValidate inputs and enforce constraints
CacheNodeOptimizationCache expensive operations
HumanNodeControl FlowHuman-in-the-loop approvals

Calls a language model with a templated prompt. The response is stored in the envelope under the configured output key. PetalFlow v0.2.0 adds full support for multi-turn tool use workflows.

FieldTypeDescription
ModelstringModel identifier (e.g., gpt-4o, claude-3-opus)
PromptTemplatestringGo template for the prompt. Access envelope via {{.Vars.key}}
SystemPromptstringOptional system message (Chat Completions API style)
InstructionsstringOptional system instructions (Responses API style)
OutputKeystringEnvelope key to store the response
Temperaturefloat64Sampling temperature (0.0-2.0)
MaxTokensintMaximum tokens in response
StopSequences[]stringSequences that stop generation
node := petalflow.NewLLMNode("summarize", client, petalflow.LLMNodeConfig{
Model: "gpt-4o",
SystemPrompt: "You are a technical writer. Be concise and accurate.",
PromptTemplate: `Summarize the following document in 3 bullet points:
{{.Vars.document}}`,
OutputKey: "summary",
Temperature: 0.3,
MaxTokens: 500,
})

Access envelope data in templates:

// Access variables
"{{.Vars.query}}"
"{{.Vars.context}}"
// Access message history
"{{range .Messages}}{{.Role}}: {{.Content}}\n{{end}}"
// Conditional content
"{{if .Vars.include_examples}}Examples: {{.Vars.examples}}{{end}}"

PetalFlow v0.2.0 introduces new types for multi-turn tool use workflows:

Messages in PetalFlow conversations:

type LLMMessage struct {
Role string // "system", "user", "assistant", "tool"
Content string // Message content
Name string // Optional: tool name, agent role
ToolCalls []LLMToolCall // For assistant messages with pending tool calls
ToolResults []LLMToolResult // For tool result messages (Role="tool")
Meta map[string]any // Optional metadata
}

Represents a tool invocation requested by the model:

type LLMToolCall struct {
ID string
Name string
Arguments map[string]any
}

Represents the result of executing a tool:

type LLMToolResult struct {
CallID string // Must match LLMToolCall.ID from the response
Content any // Result data (will be JSON marshaled by the adapter)
IsError bool // True if this represents an error result
}

Contains reasoning information from models that support it (e.g., o1, o3):

type LLMReasoningOutput struct {
ID string // Reasoning output identifier
Summary []string // Reasoning summary points
}

The response from an LLM call:

type LLMResponse struct {
Text string // Raw text output
JSON map[string]any // Parsed JSON if structured output requested
Messages []LLMMessage // Conversation messages including response
Usage LLMTokenUsage // Token consumption
Provider string // Provider ID that handled the request
Model string // Model that generated the response
ToolCalls []LLMToolCall // Tool calls requested by the model
Reasoning *LLMReasoningOutput // Reasoning output (optional)
Status string // Response status (optional)
Meta map[string]any // Additional response metadata
}
// Build a workflow that handles tool calls
g := petalflow.NewGraph("tool-workflow")
// LLM node that may request tool calls
llmNode := petalflow.NewLLMNode("agent", client, petalflow.LLMNodeConfig{
Model: "gpt-4o",
SystemPrompt: "You are a helpful assistant with access to tools.",
OutputKey: "agent_response",
})
// Tool node to execute requested tools
toolNode := petalflow.NewToolNode("executor", petalflow.ToolNodeConfig{
ToolName: "search",
OutputKey: "tool_result",
})
// Router to check if tool calls are needed
router := petalflow.NewRuleRouter("check_tools", petalflow.RuleRouterConfig{
Routes: []petalflow.RouteRule{
{
When: petalflow.RouteCondition{
Var: "agent_response.tool_calls",
Op: petalflow.OpNotEmpty,
},
To: "executor",
},
},
Default: "output",
})
g.AddNode(llmNode)
g.AddNode(toolNode)
g.AddNode(router)
g.AddEdge("agent", "check_tools")
g.AddEdge("executor", "agent") // Loop back with tool results

Executes external tools, APIs, or functions. Tools are registered with the Iris client and called by name.

FieldTypeDescription
ToolNamestringName of the registered tool to call
InputMappingmap[string]stringMap envelope keys to tool parameters
OutputKeystringEnvelope key to store the result
Timeouttime.DurationMaximum execution time
RetryPolicyRetryPolicyRetry configuration for failures
// Register tool with Iris client
client.RegisterTool(iris.Tool{
Name: "search_knowledge_base",
Description: "Search the knowledge base for relevant documents",
Parameters: iris.Parameters{
Type: "object",
Properties: map[string]iris.Property{
"query": {Type: "string", Description: "Search query"},
"limit": {Type: "integer", Description: "Max results"},
},
Required: []string{"query"},
},
Handler: searchHandler,
})
// Create node that calls the tool
node := petalflow.NewToolNode("search", petalflow.ToolNodeConfig{
ToolName: "search_knowledge_base",
InputMapping: map[string]string{
"query": "user_query", // envelope key -> tool param
"limit": "result_limit",
},
OutputKey: "search_results",
Timeout: 30 * time.Second,
})

Routes execution to different nodes based on conditional rules. Rules are evaluated in order; the first match determines the route.

FieldTypeDescription
Routes[]RouteRuleOrdered list of routing rules
DefaultstringFallback node if no rules match
FieldTypeDescription
WhenRouteConditionCondition to evaluate
TostringTarget node ID when condition matches
OperatorDescription
OpEqualsExact equality
OpNotEqualsNot equal
OpContainsString contains substring
OpStartsWithString starts with prefix
OpEndsWithString ends with suffix
OpGt, OpGteGreater than (or equal)
OpLt, OpLteLess than (or equal)
OpInValue in list
OpNotInValue not in list
OpEmptyValue is empty/nil
OpNotEmptyValue is not empty
router := petalflow.NewRuleRouter("priority_router", petalflow.RuleRouterConfig{
Routes: []petalflow.RouteRule{
// High priority tickets go to urgent handler
{
When: petalflow.RouteCondition{
Var: "priority",
Op: petalflow.OpEquals,
Value: "high",
},
To: "urgent_handler",
},
// VIP customers get dedicated support
{
When: petalflow.RouteCondition{
Var: "customer_tier",
Op: petalflow.OpIn,
Value: []string{"platinum", "gold"},
},
To: "vip_handler",
},
// Large orders need review
{
When: petalflow.RouteCondition{
Var: "order_total",
Op: petalflow.OpGte,
Value: 10000,
},
To: "review_handler",
},
},
Default: "standard_handler",
})

Routes execution based on LLM classification. The model analyzes input and selects from predefined categories.

FieldTypeDescription
ModelstringModel to use for classification
Categories[]CategoryAvailable routing categories
PromptTemplatestringTemplate for classification prompt
InputKeystringEnvelope key containing input to classify
FieldTypeDescription
NamestringCategory identifier (used as node ID target)
DescriptionstringDescription to help the LLM classify
router := petalflow.NewLLMRouter("intent_classifier", client, petalflow.LLMRouterConfig{
Model: "gpt-4o-mini",
InputKey: "user_message",
Categories: []petalflow.Category{
{Name: "billing", Description: "Questions about invoices, payments, refunds, or pricing"},
{Name: "technical", Description: "Technical issues, bugs, errors, or how-to questions"},
{Name: "sales", Description: "Product inquiries, upgrades, or new purchases"},
{Name: "general", Description: "General questions, feedback, or other topics"},
},
PromptTemplate: `Classify the following customer message into exactly one category.
Message: {{.Vars.user_message}}
Categories:
{{range .Categories}}- {{.Name}}: {{.Description}}
{{end}}
Respond with only the category name.`,
})

Filters a list in the envelope based on criteria. Useful for narrowing search results, removing invalid entries, or selecting top items.

FieldTypeDescription
InputKeystringEnvelope key containing the list to filter
OutputKeystringEnvelope key for filtered results
Criteria[]FilterCriterionFilter conditions (all must pass)
LimitintMaximum items to return (0 = no limit)
SortBystringField to sort by before limiting
SortDescboolSort descending (default ascending)
FieldTypeDescription
FieldstringField name to check on each item
OpFilterOpComparison operator
ValueanyValue to compare against
// Filter search results to high-relevance, recent documents
filter := petalflow.NewFilterNode("filter_results", petalflow.FilterNodeConfig{
InputKey: "search_results",
OutputKey: "filtered_results",
Criteria: []petalflow.FilterCriterion{
{Field: "relevance_score", Op: petalflow.FilterGte, Value: 0.7},
{Field: "published_date", Op: petalflow.FilterGte, Value: "2024-01-01"},
{Field: "status", Op: petalflow.FilterEquals, Value: "published"},
},
SortBy: "relevance_score",
SortDesc: true,
Limit: 5,
})

Transforms data in the envelope using Go functions or templates. Use for parsing, formatting, restructuring, or computing derived values.

FieldTypeDescription
InputKeys[]stringEnvelope keys to read
OutputKeystringEnvelope key for result
TransformTransformFuncFunction that performs the transformation
TemplatestringAlternative: use a Go template for simple transforms
transform := petalflow.NewTransformNode("parse_json", petalflow.TransformNodeConfig{
InputKeys: []string{"raw_response"},
OutputKey: "parsed_data",
Transform: func(inputs map[string]any) (any, error) {
raw := inputs["raw_response"].(string)
var data map[string]any
if err := json.Unmarshal([]byte(raw), &data); err != nil {
return nil, fmt.Errorf("invalid JSON: %w", err)
}
// Extract and restructure
return map[string]any{
"title": data["name"],
"content": data["body"],
"tags": data["labels"],
}, nil
},
})
transform := petalflow.NewTransformNode("format_response", petalflow.TransformNodeConfig{
InputKeys: []string{"summary", "sources"},
OutputKey: "formatted_response",
Template: `## Summary
{{.Vars.summary}}
## Sources
{{range .Vars.sources}}- {{.title}}: {{.url}}
{{end}}`,
})

Waits for multiple parallel branches to complete, then merges their results into a single envelope value.

FieldTypeDescription
InputKeys[]stringEnvelope keys to merge (one per branch)
OutputKeystringEnvelope key for merged result
MergeStyleMergeStyleHow to combine values
MergeFuncMergeFuncCustom merge function (optional)
StyleDescription
MergeStyleArrayCollect all values into an array
MergeStyleObjectCombine into object with input keys as fields
MergeStyleConcatConcatenate string values
MergeStyleFirstUse first non-nil value
MergeStyleCustomUse provided MergeFunc
// Merge parallel analysis results into a single object
merge := petalflow.NewMergeNode("combine_analysis", petalflow.MergeNodeConfig{
InputKeys: []string{"sentiment_result", "entity_result", "summary_result"},
OutputKey: "full_analysis",
MergeStyle: petalflow.MergeStyleObject,
})
// Result structure:
// {
// "sentiment_result": {...},
// "entity_result": {...},
// "summary_result": {...}
// }
merge := petalflow.NewMergeNode("weighted_merge", petalflow.MergeNodeConfig{
InputKeys: []string{"score_a", "score_b", "score_c"},
OutputKey: "final_score",
MergeStyle: petalflow.MergeStyleCustom,
MergeFunc: func(values map[string]any) (any, error) {
a := values["score_a"].(float64)
b := values["score_b"].(float64)
c := values["score_c"].(float64)
// Weighted average
return (a*0.5 + b*0.3 + c*0.2), nil
},
})

Validates envelope data against constraints. Guards can reject invalid data, log warnings, or trigger alternative paths.

FieldTypeDescription
Checks[]GuardCheckValidation rules to apply
OnFailGuardActionAction when validation fails
FailRoutestringNode to route to on failure (if action is Route)
ErrorKeystringEnvelope key to store error details
ActionDescription
GuardActionRejectStop execution with error
GuardActionWarnLog warning and continue
GuardActionRouteRoute to failure handling node
GuardActionSkipSkip remaining nodes in current branch
guard := petalflow.NewGuardianNode("input_validator", petalflow.GuardianNodeConfig{
Checks: []petalflow.GuardCheck{
// Required field check
{
Var: "query",
Op: petalflow.OpNotEmpty,
Message: "Query is required",
},
// Length constraint
{
Var: "query",
Op: petalflow.OpMaxLength,
Value: 10000,
Message: "Query exceeds maximum length of 10000 characters",
},
// Format validation
{
Var: "email",
Op: petalflow.OpMatches,
Value: `^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$`,
Message: "Invalid email format",
},
// Numeric range
{
Var: "quantity",
Op: petalflow.OpBetween,
Value: []int{1, 100},
Message: "Quantity must be between 1 and 100",
},
},
OnFail: petalflow.GuardActionRoute,
FailRoute: "validation_error_handler",
ErrorKey: "validation_errors",
})

Caches the results of expensive operations. Subsequent executions with the same cache key return cached values without re-execution.

FieldTypeDescription
CacheKeystringTemplate for generating cache keys
StoreCacheStoreCache backend (memory, Redis, etc.)
TTLtime.DurationTime-to-live for cached entries
InputKeys[]stringEnvelope keys to cache
EnabledboolEnable/disable caching
// Cache expensive LLM embeddings
cache := petalflow.NewCacheNode("embedding_cache", petalflow.CacheNodeConfig{
CacheKey: "embedding:{{.Vars.document_id}}:{{.Vars.model}}",
Store: redisStore,
TTL: 24 * time.Hour,
InputKeys: []string{"embedding_vector"},
Enabled: true,
})
// Use with an LLM node
g.AddNode(petalflow.NewLLMNode("generate_embedding", client, embeddingConfig))
g.AddNode(cache)
g.AddEdge("generate_embedding", "embedding_cache")
type CacheStore interface {
Get(ctx context.Context, key string) (any, bool, error)
Set(ctx context.Context, key string, value any, ttl time.Duration) error
Delete(ctx context.Context, key string) error
}
// Built-in stores
store := petalflow.NewMemoryCacheStore() // In-memory (single instance)
store := petalflow.NewRedisCacheStore(redisClient) // Redis (distributed)

Pauses execution for human review and approval. Integrates with external approval systems via callbacks or webhooks.

FieldTypeDescription
ReviewKeystringEnvelope key containing data for review
ApprovalKeystringEnvelope key to store approval decision
CallbackHumanCallbackFunction called when review is needed
Timeouttime.DurationMaximum wait time for approval
OnTimeoutTimeoutActionAction if approval times out
human := petalflow.NewHumanNode("manager_approval", petalflow.HumanNodeConfig{
ReviewKey: "refund_request",
ApprovalKey: "approval_decision",
Timeout: 48 * time.Hour,
OnTimeout: petalflow.TimeoutActionReject,
Callback: func(ctx context.Context, env *petalflow.Envelope) (petalflow.ApprovalResult, error) {
request := env.GetVar("refund_request")
// Send to external approval system
ticketID, err := approvalSystem.CreateTicket(ctx, ApprovalTicket{
Type: "refund",
Data: request,
Urgency: "normal",
})
if err != nil {
return petalflow.ApprovalResult{}, err
}
// Wait for decision (approval system calls back)
decision, err := approvalSystem.WaitForDecision(ctx, ticketID)
if err != nil {
return petalflow.ApprovalResult{}, err
}
return petalflow.ApprovalResult{
Approved: decision.Approved,
Reviewer: decision.ReviewerID,
Notes: decision.Comments,
}, nil
},
})

For interactive approval in development or testing:

controller := petalflow.NewBreakpointStepController([]string{"manager_approval"})
opts := petalflow.NewStepRunOptions(controller)
// Graph pauses at HumanNode, allowing inspection and manual approval
result, err := runtime.Run(ctx, graph, env, opts)

Create custom nodes by implementing the Node interface:

type Node interface {
ID() string
Execute(ctx context.Context, env *Envelope) error
}
// Example custom node
type CustomAggregator struct {
id string
threshold float64
}
func NewCustomAggregator(id string, threshold float64) *CustomAggregator {
return &CustomAggregator{id: id, threshold: threshold}
}
func (n *CustomAggregator) ID() string {
return n.id
}
func (n *CustomAggregator) Execute(ctx context.Context, env *petalflow.Envelope) error {
scores := env.GetVar("scores").([]float64)
var sum float64
for _, s := range scores {
sum += s
}
avg := sum / float64(len(scores))
env.SetVar("average_score", avg)
env.SetVar("above_threshold", avg >= n.threshold)
return nil
}

Register and use in graphs:

g.AddNode(NewCustomAggregator("score_aggregator", 0.75))
g.AddEdge("collect_scores", "score_aggregator")