Petal Flow Core API
The core package provides the foundational types and interfaces for PetalFlow workflows.
import "github.com/petal-labs/petalflow/core"Or use the top-level re-exports:
import "github.com/petal-labs/petalflow"Type Reference
Section titled “Type Reference”| Type | Purpose |
|---|---|
Envelope | Data carrier between nodes |
Node / NodeFunc | Core node interface and function type |
Message | Chat-style message for LLM steps |
Artifact | Document or derived data produced during a run |
TraceInfo | Execution metadata and timing |
LLMClient | LLM provider abstraction |
LLMRequest / LLMResponse | Standard LLM request/response data |
LLMMessage | Chat message with tool call support |
LLMToolCall / LLMToolResult | Tool invocation types |
LLMReasoningOutput | Reasoning model output |
PetalTool / ToolRegistry | Tool definitions for nodes |
Envelope
Section titled “Envelope”The Envelope is the single data structure passed between nodes. It carries all state through
a workflow run.
Structure
Section titled “Structure”type Envelope struct { Input any // Primary payload for a run Vars map[string]any // Shared state across the run Artifacts []Artifact // Documents, chunks, citations, files Messages []Message // Chat-style messages for LLM steps Errors []NodeError // Accumulated errors (continue-on-error) Trace TraceInfo // Observability and replay info}Constructors
Section titled “Constructors”func NewEnvelope() *EnvelopeMethods
Section titled “Methods”| Method | Description |
|---|---|
Clone() *Envelope | Deep copy for parallel execution |
GetVar(name string) (any, bool) | Retrieve a variable by name |
GetVarString(name string) string | Retrieve a variable as string |
GetVarNested(path string) (any, bool) | Retrieve nested variable using dot notation |
SetVar(name string, value any) | Set a variable in the Vars map |
AppendArtifact(artifact Artifact) | Add an artifact to the envelope |
AppendMessage(msg Message) | Add a message to the envelope |
AppendError(err NodeError) | Record a node error |
HasErrors() bool | Check if there are any recorded errors |
GetArtifactsByType(artifactType string) []Artifact | Filter artifacts by type |
GetLastMessage() *Message | Get the most recent message |
GetMessagesByRole(role string) []Message | Filter messages by role |
WithInput(input any) *Envelope | Fluent setter for input |
WithVar(name, value) *Envelope | Fluent setter for variable |
WithTrace(trace TraceInfo) *Envelope | Fluent setter for trace |
Example
Section titled “Example”env := core.NewEnvelope()
// Set variablesenv.SetVar("user_id", "usr_123")env.SetVar("query", "How do I reset my password?")
// Use fluent styleenv = core.NewEnvelope(). WithVar("input", userInput). WithVar("context", contextData)
// Get valuesuserID := env.GetVarString("user_id")
// Get nested values using dot notationresponseID, ok := env.GetVarNested("response.data.id")
// Add messages for LLM contextenv.AppendMessage(core.Message{ Role: "system", Content: "You are a helpful assistant.",})
// Clone for parallel branchesclonedEnv := env.Clone()Node Interface
Section titled “Node Interface”Nodes are the fundamental units of execution in a PetalFlow graph.
Node Interface
Section titled “Node Interface”type Node interface { ID() string Kind() NodeKind Execute(ctx context.Context, env *Envelope) error}NodeKind Constants
Section titled “NodeKind Constants”const ( NodeKindLLM NodeKind = "llm" NodeKindTool NodeKind = "tool" NodeKindRouter NodeKind = "router" NodeKindMerge NodeKind = "merge" NodeKindMap NodeKind = "map" NodeKindGate NodeKind = "gate" NodeKindNoop NodeKind = "noop" NodeKindFilter NodeKind = "filter" NodeKindTransform NodeKind = "transform" NodeKindGuardian NodeKind = "guardian" NodeKindCache NodeKind = "cache" NodeKindWebhookCall NodeKind = "webhook_call" NodeKindWebhookTrigger NodeKind = "webhook_trigger" NodeKindHuman NodeKind = "human" NodeKindConditional NodeKind = "conditional")FuncNode
Section titled “FuncNode”Wrap a function as a node:
type NodeFunc = func(ctx context.Context, env *Envelope) (*Envelope, error)
func NewFuncNode(id string, fn NodeFunc) *FuncNodeExample
Section titled “Example”node := core.NewFuncNode("transform", func(ctx context.Context, env *core.Envelope) (*core.Envelope, error) { input := env.GetVarString("input") env.SetVar("output", strings.ToUpper(input)) return env, nil})BaseNode
Section titled “BaseNode”A base implementation for custom nodes:
type BaseNode struct { id string kind NodeKind}
func NewBaseNode(id string, kind NodeKind) *BaseNodeMessage
Section titled “Message”Chat-style message for LLM steps and auditing:
type Message struct { Role string // "system" | "user" | "assistant" | "tool" Content string // plain text; markdown allowed Name string // optional (tool name, agent role, etc.) Meta map[string]any // optional metadata}Artifact
Section titled “Artifact”Represents a document or derived data produced during a run:
type Artifact struct { ID string // stable within a run Type string // e.g., "document", "chunk", "citation", "json" MimeType string // e.g., "text/plain", "application/json" Text string // optional textual content Bytes []byte // optional binary content URI string // optional pointer to external storage Meta map[string]any // flexible metadata}TraceInfo
Section titled “TraceInfo”Propagated by the runtime for observability and replay:
type TraceInfo struct { RunID string // unique identifier for this run ParentID string // optional: for subgraphs or map/fanout SpanID string // optional: for node-level tracing TraceID string // OpenTelemetry trace ID Started time.Time // when the run started}LLM Types
Section titled “LLM Types”PetalFlow v0.2.0 introduces comprehensive LLM types for multi-turn tool use workflows.
LLMClient Interface
Section titled “LLMClient Interface”type LLMClient interface { Complete(ctx context.Context, req LLMRequest) (LLMResponse, error)}
type StreamingLLMClient interface { LLMClient CompleteStream(ctx context.Context, req LLMRequest) (<-chan StreamChunk, error)}LLMRequest
Section titled “LLMRequest”type LLMRequest struct { Model string // model identifier (e.g., "gpt-4", "claude-3-opus") System string // system prompt (Chat Completions API style) Instructions string // system instructions (Responses API style) Messages []LLMMessage // conversation messages InputText string // optional: simple prompt mode JSONSchema map[string]any // optional: structured output constraints Temperature *float64 // optional: sampling temperature MaxTokens *int // optional: maximum output tokens Meta map[string]any // trace/cost controls}LLMMessage
Section titled “LLMMessage”Chat message with tool call support:
type LLMMessage struct { Role string // "system", "user", "assistant", "tool" Content string // message content Name string // optional: tool name, agent role ToolCalls []LLMToolCall // for assistant messages with pending tool calls ToolResults []LLMToolResult // for tool result messages (Role="tool") Meta map[string]any // optional metadata}LLMResponse
Section titled “LLMResponse”type LLMResponse struct { Text string // raw text output JSON map[string]any // parsed JSON if structured output requested Messages []LLMMessage // conversation messages including response Usage LLMTokenUsage // token consumption Provider string // provider ID that handled the request Model string // model that generated the response ToolCalls []LLMToolCall // tool calls requested by the model Reasoning *LLMReasoningOutput // reasoning output (optional) Status string // response status (optional) Meta map[string]any // additional response metadata}LLMToolCall
Section titled “LLMToolCall”Represents a tool invocation requested by the model:
type LLMToolCall struct { ID string Name string Arguments map[string]any}LLMToolResult
Section titled “LLMToolResult”Result of executing a tool for multi-turn workflows:
type LLMToolResult struct { CallID string // Must match LLMToolCall.ID from the response Content any // Result data (will be JSON marshaled by the adapter) IsError bool // True if this represents an error result}LLMReasoningOutput
Section titled “LLMReasoningOutput”Reasoning information from models that support it (e.g., o1, o3):
type LLMReasoningOutput struct { ID string // Reasoning output identifier Summary []string // Reasoning summary points}LLMTokenUsage
Section titled “LLMTokenUsage”type LLMTokenUsage struct { InputTokens int OutputTokens int TotalTokens int CostUSD float64 // optional: computed cost}Tool Interface
Section titled “Tool Interface”PetalTool
Section titled “PetalTool”type PetalTool interface { Name() string Invoke(ctx context.Context, args map[string]any) (map[string]any, error)}FuncTool
Section titled “FuncTool”Simple function-backed tool:
func NewFuncTool(name, description string, fn func(ctx context.Context, args map[string]any) (map[string]any, error)) *FuncToolToolRegistry
Section titled “ToolRegistry”type ToolRegistry struct { /* ... */ }
func NewToolRegistry() *ToolRegistryfunc (r *ToolRegistry) Register(tool PetalTool)func (r *ToolRegistry) Get(name string) (PetalTool, bool)func (r *ToolRegistry) List() []stringExample
Section titled “Example”registry := core.NewToolRegistry()
// Register a function-backed toolsearchTool := core.NewFuncTool("search", "Search the database", func(ctx context.Context, args map[string]any) (map[string]any, error) { query := args["query"].(string) results, err := database.Search(ctx, query) if err != nil { return nil, err } return map[string]any{"results": results}, nil})
registry.Register(searchTool)
// Retrieve and invoketool, ok := registry.Get("search")if ok { result, err := tool.Invoke(ctx, map[string]any{"query": "example"})}Error Types
Section titled “Error Types”NodeError
Section titled “NodeError”Recorded when nodes fail but the graph continues (with ErrorPolicyContinue):
type NodeError struct { NodeID string // ID of the node that failed Kind NodeKind // kind of the node Message string // error message Attempt int // which attempt this was (1-indexed) At time.Time // when the error occurred Details map[string]any // additional error context Cause error // underlying error (may be nil)}ErrorPolicy
Section titled “ErrorPolicy”const ( ErrorPolicyFail ErrorPolicy = "fail" // Abort the run (default) ErrorPolicyContinue ErrorPolicy = "continue" // Record error and continue ErrorPolicyRecord ErrorPolicy = "record" // Record in envelope and continue)Other Types
Section titled “Other Types”RouteDecision
Section titled “RouteDecision”Produced by RouterNode to indicate which targets to activate:
type RouteDecision struct { Targets []string // node IDs to route to Reason string // explanation for the decision Confidence *float64 // optional confidence score (0.0-1.0) Meta map[string]any // additional routing metadata}RetryPolicy
Section titled “RetryPolicy”type RetryPolicy struct { MaxAttempts int // maximum number of attempts (1 = no retries) Backoff time.Duration // base backoff duration between attempts}
func DefaultRetryPolicy() RetryPolicy // Returns 3 attempts, 1s backoffBudget
Section titled “Budget”Guardrail for LLM calls to limit resource usage:
type Budget struct { MaxInputTokens int MaxOutputTokens int MaxTotalTokens int MaxCostUSD float64}TokenUsage
Section titled “TokenUsage”type TokenUsage struct { InputTokens int OutputTokens int TotalTokens int CostUSD float64}
func (u TokenUsage) Add(other TokenUsage) TokenUsage