Skip to content

Caching & Memory

Caching reduces latency and cost by reusing results from expensive operations. Memory management ensures conversation context and state persist across nodes and sessions. This guide covers cache strategies, store backends, and memory patterns for PetalFlow workflows.

CacheNode wraps expensive operations to store and retrieve results:

cache := petalflow.NewCacheNode("embedding_cache", petalflow.CacheNodeConfig{
Store: petalflow.NewMemoryCacheStore(),
TTL: 1 * time.Hour,
InputKeys: []string{"text_embedding"},
CacheKey: "embed:{{.Vars.document_id}}",
})
FieldTypeDescription
StoreCacheStoreBackend storage (memory, Redis, etc.)
TTLtime.DurationTime-to-live for cached entries
CacheKeystringTemplate for generating cache keys
InputKeys[]stringEnvelope keys to cache
KeyBuilderfunc(*Envelope) stringCustom key generation function
EnabledboolEnable/disable caching (default: true)
RefreshOnHitboolReset TTL on cache hit
NamespacestringKey prefix for isolation

Fast, single-instance cache for development and simple deployments:

store := petalflow.NewMemoryCacheStore()
// With options
store := petalflow.NewMemoryCacheStore(
petalflow.WithMaxEntries(10000),
petalflow.WithCleanupInterval(5 * time.Minute),
)

Distributed cache for production deployments:

import "github.com/redis/go-redis/v9"
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: os.Getenv("REDIS_PASSWORD"),
DB: 0,
})
store := petalflow.NewRedisCacheStore(redisClient)
// With namespace
store := petalflow.NewRedisCacheStore(redisClient,
petalflow.WithNamespace("petalflow:cache:"),
)

Implement the CacheStore interface for custom backends:

type CacheStore interface {
Get(ctx context.Context, key string) (any, bool, error)
Set(ctx context.Context, key string, value any, ttl time.Duration) error
Delete(ctx context.Context, key string) error
Clear(ctx context.Context) error
}
// Example: S3-backed cache for large objects
type S3CacheStore struct {
client *s3.Client
bucket string
}
func (s *S3CacheStore) Get(ctx context.Context, key string) (any, bool, error) {
result, err := s.client.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
})
if err != nil {
var notFound *types.NoSuchKey
if errors.As(err, &notFound) {
return nil, false, nil
}
return nil, false, err
}
defer result.Body.Close()
var value any
if err := json.NewDecoder(result.Body).Decode(&value); err != nil {
return nil, false, err
}
return value, true, nil
}
func (s *S3CacheStore) Set(ctx context.Context, key string, value any, ttl time.Duration) error {
data, err := json.Marshal(value)
if err != nil {
return err
}
_, err = s.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
Body: bytes.NewReader(data),
})
return err
}

Effective cache keys ensure high hit rates while avoiding stale data.

Use Go templates to build keys from envelope data:

// Simple key from single variable
cache := petalflow.NewCacheNode("user_cache", petalflow.CacheNodeConfig{
CacheKey: "user:{{.Vars.user_id}}",
// ...
})
// Composite key from multiple variables
cache := petalflow.NewCacheNode("search_cache", petalflow.CacheNodeConfig{
CacheKey: "search:{{.Vars.query}}:{{.Vars.filters}}:{{.Vars.page}}",
// ...
})
// Include model version for embeddings
cache := petalflow.NewCacheNode("embedding_cache", petalflow.CacheNodeConfig{
CacheKey: "embed:{{.Vars.model}}:{{.Vars.document_hash}}",
// ...
})

For complex key generation logic:

cache := petalflow.NewCacheNode("smart_cache", petalflow.CacheNodeConfig{
Store: store,
TTL: 30 * time.Minute,
KeyBuilder: func(env *petalflow.Envelope) string {
// Normalize query for better cache hits
query := strings.ToLower(strings.TrimSpace(env.GetVarString("query")))
// Hash long queries
if len(query) > 100 {
hash := sha256.Sum256([]byte(query))
query = hex.EncodeToString(hash[:16])
}
// Include relevant context
userID := env.GetVarString("user_id")
locale := env.GetVarString("locale")
return fmt.Sprintf("query:%s:%s:%s", userID, locale, query)
},
})

Improve hit rates by normalizing inputs:

func normalizedKeyBuilder(env *petalflow.Envelope) string {
query := env.GetVarString("search_query")
// Lowercase
query = strings.ToLower(query)
// Remove extra whitespace
query = strings.Join(strings.Fields(query), " ")
// Sort query parameters for consistent ordering
params := env.GetVar("filters").(map[string]string)
keys := make([]string, 0, len(params))
for k := range params {
keys = append(keys, k)
}
sort.Strings(keys)
var filterParts []string
for _, k := range keys {
filterParts = append(filterParts, fmt.Sprintf("%s=%s", k, params[k]))
}
return fmt.Sprintf("search:%s:%s", query, strings.Join(filterParts, "&"))
}

Choose TTL based on data characteristics:

Data TypeRecommended TTLRationale
Embeddings24h - 7dRarely change for same input
Search results5m - 1hBalance freshness vs. cost
LLM classifications1h - 24hStable for same input
User preferences15m - 1hMay change during session
Real-time dataNo cacheAlways needs fresh data

Set TTL based on response characteristics:

cache := petalflow.NewCacheNode("adaptive_cache", petalflow.CacheNodeConfig{
Store: store,
TTLFunc: func(env *petalflow.Envelope, value any) time.Duration {
// Cache successful results longer
if result, ok := value.(map[string]any); ok {
if result["success"] == true {
return 1 * time.Hour
}
// Cache errors briefly
return 1 * time.Minute
}
return 15 * time.Minute
},
})

Keep frequently accessed data warm:

cache := petalflow.NewCacheNode("hot_cache", petalflow.CacheNodeConfig{
Store: store,
TTL: 30 * time.Minute,
RefreshOnHit: true, // Reset TTL on each access
})

The standard pattern where you check cache before computing:

g := petalflow.NewGraph("cache-aside")
// Check cache first
g.AddNode(petalflow.NewCacheNode("check_cache", petalflow.CacheNodeConfig{
Store: store,
CacheKey: "result:{{.Vars.input_hash}}",
InputKeys: []string{"cached_result"},
Mode: petalflow.CacheModeRead,
}))
// Route based on cache hit
g.AddNode(petalflow.NewRuleRouter("cache_check", petalflow.RuleRouterConfig{
Routes: []petalflow.RouteRule{
{When: petalflow.RouteCondition{Var: "cached_result", Op: petalflow.OpNotEmpty}, To: "use_cached"},
},
Default: "compute",
}))
// Compute if cache miss
g.AddNode(petalflow.NewLLMNode("compute", client, computeConfig))
// Write to cache after compute
g.AddNode(petalflow.NewCacheNode("write_cache", petalflow.CacheNodeConfig{
Store: store,
CacheKey: "result:{{.Vars.input_hash}}",
InputKeys: []string{"computed_result"},
Mode: petalflow.CacheModeWrite,
TTL: 1 * time.Hour,
}))

Cache inline with computation:

g := petalflow.NewGraph("write-through")
// Compute
g.AddNode(petalflow.NewLLMNode("generate", client, generateConfig))
// Cache the result (inline, no routing needed)
g.AddNode(petalflow.NewCacheNode("cache_result", petalflow.CacheNodeConfig{
Store: store,
CacheKey: "generated:{{.Vars.prompt_hash}}",
InputKeys: []string{"generated_response"},
TTL: 2 * time.Hour,
}))
g.AddEdge("generate", "cache_result")

Combine fast local cache with distributed cache:

memoryStore := petalflow.NewMemoryCacheStore()
redisStore := petalflow.NewRedisCacheStore(redisClient)
multiStore := petalflow.NewMultiLevelCacheStore(
memoryStore, // L1: fast, local
redisStore, // L2: shared, distributed
)
cache := petalflow.NewCacheNode("multi_cache", petalflow.CacheNodeConfig{
Store: multiStore,
CacheKey: "data:{{.Vars.key}}",
InputKeys: []string{"result"},
TTL: 1 * time.Hour,
})

Delete specific cache entries:

// In a transform node or custom logic
func invalidateCache(ctx context.Context, store petalflow.CacheStore, keys []string) error {
for _, key := range keys {
if err := store.Delete(ctx, key); err != nil {
return err
}
}
return nil
}
// Invalidate when data changes
invalidator := petalflow.NewTransformNode("invalidate", petalflow.TransformNodeConfig{
Transform: func(inputs map[string]any) (any, error) {
userID := inputs["user_id"].(string)
keys := []string{
fmt.Sprintf("user:%s:profile", userID),
fmt.Sprintf("user:%s:preferences", userID),
}
return invalidateCache(ctx, store, keys), nil
},
})

Clear all keys matching a pattern (Redis):

func invalidatePattern(ctx context.Context, client *redis.Client, pattern string) error {
iter := client.Scan(ctx, 0, pattern, 100).Iterator()
for iter.Next(ctx) {
if err := client.Del(ctx, iter.Val()).Err(); err != nil {
return err
}
}
return iter.Err()
}
// Invalidate all cache for a user
invalidatePattern(ctx, redisClient, "petalflow:cache:user:123:*")

Use versioned keys to invalidate entire cache generations:

// Store version in config or database
cacheVersion := "v2"
cache := petalflow.NewCacheNode("versioned_cache", petalflow.CacheNodeConfig{
Store: store,
CacheKey: fmt.Sprintf("%s:data:{{.Vars.key}}", cacheVersion),
InputKeys: []string{"result"},
TTL: 24 * time.Hour,
})
// To invalidate: increment cacheVersion to "v3"
// Old keys will expire naturally

Maintain conversation context across multiple turns.

// Initialize conversation
env := petalflow.NewEnvelope()
env.SetMessages([]petalflow.Message{})
// Add system message
env.AddMessage(petalflow.Message{
Role: "system",
Content: "You are a helpful assistant.",
})
// Process turns
for _, userInput := range conversation {
env.AddMessage(petalflow.Message{
Role: "user",
Content: userInput,
})
result, err := runtime.Run(ctx, graph, env, petalflow.RunOptions{})
if err != nil {
return err
}
// LLMNode automatically adds assistant response to messages
assistantResponse := result.GetVar("response").(string)
}

Keep only recent messages to manage context length:

memoryManager := petalflow.NewTransformNode("manage_memory", petalflow.TransformNodeConfig{
Transform: func(inputs map[string]any) (any, error) {
messages := inputs["messages"].([]petalflow.Message)
maxMessages := 20
if len(messages) > maxMessages {
// Keep system message + recent messages
systemMsg := messages[0]
recentMsgs := messages[len(messages)-(maxMessages-1):]
messages = append([]petalflow.Message{systemMsg}, recentMsgs...)
}
return messages, nil
},
InputKeys: []string{"messages"},
OutputKey: "messages",
})

Summarize older messages to preserve context while reducing tokens:

g := petalflow.NewGraph("summarized-memory")
// Check if summarization is needed
g.AddNode(petalflow.NewRuleRouter("memory_check", petalflow.RuleRouterConfig{
Routes: []petalflow.RouteRule{
{When: petalflow.RouteCondition{Var: "message_count", Op: petalflow.OpGt, Value: 30}, To: "summarize"},
},
Default: "skip_summarize",
}))
// Summarize old messages
g.AddNode(petalflow.NewLLMNode("summarize", client, petalflow.LLMNodeConfig{
Model: "gpt-4o-mini",
PromptTemplate: `Summarize this conversation history concisely:
{{range .Messages}}{{.Role}}: {{.Content}}
{{end}}
Preserve key facts, decisions, and context.`,
OutputKey: "conversation_summary",
}))
// Replace old messages with summary
g.AddNode(petalflow.NewTransformNode("compact_memory", petalflow.TransformNodeConfig{
Transform: func(inputs map[string]any) (any, error) {
summary := inputs["conversation_summary"].(string)
recentMessages := inputs["recent_messages"].([]petalflow.Message)
// Create new message history with summary
messages := []petalflow.Message{
{Role: "system", Content: "Previous conversation summary: " + summary},
}
messages = append(messages, recentMessages...)
return messages, nil
},
}))

Store conversations for resumption:

type ConversationStore interface {
Save(ctx context.Context, sessionID string, env *petalflow.Envelope) error
Load(ctx context.Context, sessionID string) (*petalflow.Envelope, error)
Delete(ctx context.Context, sessionID string) error
}
// Redis-backed conversation store
type RedisConversationStore struct {
client *redis.Client
ttl time.Duration
}
func (s *RedisConversationStore) Save(ctx context.Context, sessionID string, env *petalflow.Envelope) error {
data, err := json.Marshal(env.Export())
if err != nil {
return err
}
return s.client.Set(ctx, "conversation:"+sessionID, data, s.ttl).Err()
}
func (s *RedisConversationStore) Load(ctx context.Context, sessionID string) (*petalflow.Envelope, error) {
data, err := s.client.Get(ctx, "conversation:"+sessionID).Bytes()
if err != nil {
if err == redis.Nil {
return petalflow.NewEnvelope(), nil
}
return nil, err
}
var exported map[string]any
if err := json.Unmarshal(data, &exported); err != nil {
return nil, err
}
return petalflow.ImportEnvelope(exported), nil
}

Pre-populate cache for expected requests:

func warmCache(ctx context.Context, store petalflow.CacheStore, items []WarmItem) error {
for _, item := range items {
if err := store.Set(ctx, item.Key, item.Value, item.TTL); err != nil {
return err
}
}
return nil
}
// Warm cache on startup
popularQueries := []WarmItem{
{Key: "faq:pricing", Value: pricingResponse, TTL: 24 * time.Hour},
{Key: "faq:features", Value: featuresResponse, TTL: 24 * time.Hour},
}
warmCache(ctx, store, popularQueries)

Monitor cache performance:

type MetricsCacheStore struct {
underlying petalflow.CacheStore
hits *prometheus.CounterVec
misses *prometheus.CounterVec
latency *prometheus.HistogramVec
}
func (m *MetricsCacheStore) Get(ctx context.Context, key string) (any, bool, error) {
start := time.Now()
value, found, err := m.underlying.Get(ctx, key)
labels := prometheus.Labels{"operation": "get"}
m.latency.With(labels).Observe(time.Since(start).Seconds())
if found {
m.hits.With(labels).Inc()
} else {
m.misses.With(labels).Inc()
}
return value, found, err
}