Skip to content

Petal Flow Core API

The core package provides the foundational types and interfaces for PetalFlow workflows.

import "github.com/petal-labs/petalflow/core"

Or use the top-level re-exports:

import "github.com/petal-labs/petalflow"

TypePurpose
EnvelopeData carrier between nodes
Node / NodeFuncCore node interface and function type
MessageChat-style message for LLM steps
ArtifactDocument or derived data produced during a run
TraceInfoExecution metadata and timing
LLMClientLLM provider abstraction
LLMRequest / LLMResponseStandard LLM request/response data
LLMMessageChat message with tool call support
LLMToolCall / LLMToolResultTool invocation types
LLMReasoningOutputReasoning model output
PetalTool / ToolRegistryTool definitions for nodes

The Envelope is the single data structure passed between nodes. It carries all state through a workflow run.

type Envelope struct {
Input any // Primary payload for a run
Vars map[string]any // Shared state across the run
Artifacts []Artifact // Documents, chunks, citations, files
Messages []Message // Chat-style messages for LLM steps
Errors []NodeError // Accumulated errors (continue-on-error)
Trace TraceInfo // Observability and replay info
}
func NewEnvelope() *Envelope
MethodDescription
Clone() *EnvelopeDeep copy for parallel execution
GetVar(name string) (any, bool)Retrieve a variable by name
GetVarString(name string) stringRetrieve a variable as string
GetVarNested(path string) (any, bool)Retrieve nested variable using dot notation
SetVar(name string, value any)Set a variable in the Vars map
AppendArtifact(artifact Artifact)Add an artifact to the envelope
AppendMessage(msg Message)Add a message to the envelope
AppendError(err NodeError)Record a node error
HasErrors() boolCheck if there are any recorded errors
GetArtifactsByType(artifactType string) []ArtifactFilter artifacts by type
GetLastMessage() *MessageGet the most recent message
GetMessagesByRole(role string) []MessageFilter messages by role
WithInput(input any) *EnvelopeFluent setter for input
WithVar(name, value) *EnvelopeFluent setter for variable
WithTrace(trace TraceInfo) *EnvelopeFluent setter for trace
env := core.NewEnvelope()
// Set variables
env.SetVar("user_id", "usr_123")
env.SetVar("query", "How do I reset my password?")
// Use fluent style
env = core.NewEnvelope().
WithVar("input", userInput).
WithVar("context", contextData)
// Get values
userID := env.GetVarString("user_id")
// Get nested values using dot notation
responseID, ok := env.GetVarNested("response.data.id")
// Add messages for LLM context
env.AppendMessage(core.Message{
Role: "system",
Content: "You are a helpful assistant.",
})
// Clone for parallel branches
clonedEnv := env.Clone()

Nodes are the fundamental units of execution in a PetalFlow graph.

type Node interface {
ID() string
Kind() NodeKind
Execute(ctx context.Context, env *Envelope) error
}
const (
NodeKindLLM NodeKind = "llm"
NodeKindTool NodeKind = "tool"
NodeKindRouter NodeKind = "router"
NodeKindMerge NodeKind = "merge"
NodeKindMap NodeKind = "map"
NodeKindGate NodeKind = "gate"
NodeKindNoop NodeKind = "noop"
NodeKindFilter NodeKind = "filter"
NodeKindTransform NodeKind = "transform"
NodeKindGuardian NodeKind = "guardian"
NodeKindCache NodeKind = "cache"
NodeKindWebhookCall NodeKind = "webhook_call"
NodeKindWebhookTrigger NodeKind = "webhook_trigger"
NodeKindHuman NodeKind = "human"
NodeKindConditional NodeKind = "conditional"
)

Wrap a function as a node:

type NodeFunc = func(ctx context.Context, env *Envelope) (*Envelope, error)
func NewFuncNode(id string, fn NodeFunc) *FuncNode
node := core.NewFuncNode("transform", func(ctx context.Context, env *core.Envelope) (*core.Envelope, error) {
input := env.GetVarString("input")
env.SetVar("output", strings.ToUpper(input))
return env, nil
})

A base implementation for custom nodes:

type BaseNode struct {
id string
kind NodeKind
}
func NewBaseNode(id string, kind NodeKind) *BaseNode

Chat-style message for LLM steps and auditing:

type Message struct {
Role string // "system" | "user" | "assistant" | "tool"
Content string // plain text; markdown allowed
Name string // optional (tool name, agent role, etc.)
Meta map[string]any // optional metadata
}

Represents a document or derived data produced during a run:

type Artifact struct {
ID string // stable within a run
Type string // e.g., "document", "chunk", "citation", "json"
MimeType string // e.g., "text/plain", "application/json"
Text string // optional textual content
Bytes []byte // optional binary content
URI string // optional pointer to external storage
Meta map[string]any // flexible metadata
}

Propagated by the runtime for observability and replay:

type TraceInfo struct {
RunID string // unique identifier for this run
ParentID string // optional: for subgraphs or map/fanout
SpanID string // optional: for node-level tracing
TraceID string // OpenTelemetry trace ID
Started time.Time // when the run started
}

PetalFlow v0.2.0 introduces comprehensive LLM types for multi-turn tool use workflows.

type LLMClient interface {
Complete(ctx context.Context, req LLMRequest) (LLMResponse, error)
}
type StreamingLLMClient interface {
LLMClient
CompleteStream(ctx context.Context, req LLMRequest) (<-chan StreamChunk, error)
}
type LLMRequest struct {
Model string // model identifier (e.g., "gpt-4", "claude-3-opus")
System string // system prompt (Chat Completions API style)
Instructions string // system instructions (Responses API style)
Messages []LLMMessage // conversation messages
InputText string // optional: simple prompt mode
JSONSchema map[string]any // optional: structured output constraints
Temperature *float64 // optional: sampling temperature
MaxTokens *int // optional: maximum output tokens
Meta map[string]any // trace/cost controls
}

Chat message with tool call support:

type LLMMessage struct {
Role string // "system", "user", "assistant", "tool"
Content string // message content
Name string // optional: tool name, agent role
ToolCalls []LLMToolCall // for assistant messages with pending tool calls
ToolResults []LLMToolResult // for tool result messages (Role="tool")
Meta map[string]any // optional metadata
}
type LLMResponse struct {
Text string // raw text output
JSON map[string]any // parsed JSON if structured output requested
Messages []LLMMessage // conversation messages including response
Usage LLMTokenUsage // token consumption
Provider string // provider ID that handled the request
Model string // model that generated the response
ToolCalls []LLMToolCall // tool calls requested by the model
Reasoning *LLMReasoningOutput // reasoning output (optional)
Status string // response status (optional)
Meta map[string]any // additional response metadata
}

Represents a tool invocation requested by the model:

type LLMToolCall struct {
ID string
Name string
Arguments map[string]any
}

Result of executing a tool for multi-turn workflows:

type LLMToolResult struct {
CallID string // Must match LLMToolCall.ID from the response
Content any // Result data (will be JSON marshaled by the adapter)
IsError bool // True if this represents an error result
}

Reasoning information from models that support it (e.g., o1, o3):

type LLMReasoningOutput struct {
ID string // Reasoning output identifier
Summary []string // Reasoning summary points
}
type LLMTokenUsage struct {
InputTokens int
OutputTokens int
TotalTokens int
CostUSD float64 // optional: computed cost
}

type PetalTool interface {
Name() string
Invoke(ctx context.Context, args map[string]any) (map[string]any, error)
}

Simple function-backed tool:

func NewFuncTool(name, description string, fn func(ctx context.Context, args map[string]any) (map[string]any, error)) *FuncTool
type ToolRegistry struct { /* ... */ }
func NewToolRegistry() *ToolRegistry
func (r *ToolRegistry) Register(tool PetalTool)
func (r *ToolRegistry) Get(name string) (PetalTool, bool)
func (r *ToolRegistry) List() []string
registry := core.NewToolRegistry()
// Register a function-backed tool
searchTool := core.NewFuncTool("search", "Search the database", func(ctx context.Context, args map[string]any) (map[string]any, error) {
query := args["query"].(string)
results, err := database.Search(ctx, query)
if err != nil {
return nil, err
}
return map[string]any{"results": results}, nil
})
registry.Register(searchTool)
// Retrieve and invoke
tool, ok := registry.Get("search")
if ok {
result, err := tool.Invoke(ctx, map[string]any{"query": "example"})
}

Recorded when nodes fail but the graph continues (with ErrorPolicyContinue):

type NodeError struct {
NodeID string // ID of the node that failed
Kind NodeKind // kind of the node
Message string // error message
Attempt int // which attempt this was (1-indexed)
At time.Time // when the error occurred
Details map[string]any // additional error context
Cause error // underlying error (may be nil)
}
const (
ErrorPolicyFail ErrorPolicy = "fail" // Abort the run (default)
ErrorPolicyContinue ErrorPolicy = "continue" // Record error and continue
ErrorPolicyRecord ErrorPolicy = "record" // Record in envelope and continue
)

Produced by RouterNode to indicate which targets to activate:

type RouteDecision struct {
Targets []string // node IDs to route to
Reason string // explanation for the decision
Confidence *float64 // optional confidence score (0.0-1.0)
Meta map[string]any // additional routing metadata
}
type RetryPolicy struct {
MaxAttempts int // maximum number of attempts (1 = no retries)
Backoff time.Duration // base backoff duration between attempts
}
func DefaultRetryPolicy() RetryPolicy // Returns 3 attempts, 1s backoff

Guardrail for LLM calls to limit resource usage:

type Budget struct {
MaxInputTokens int
MaxOutputTokens int
MaxTotalTokens int
MaxCostUSD float64
}
type TokenUsage struct {
InputTokens int
OutputTokens int
TotalTokens int
CostUSD float64
}
func (u TokenUsage) Add(other TokenUsage) TokenUsage