Caching & Memory
Caching & Memory
Section titled “Caching & Memory”Caching reduces latency and cost by reusing results from expensive operations. Memory management ensures conversation context and state persist across nodes and sessions. This guide covers cache strategies, store backends, and memory patterns for PetalFlow workflows.
Cache Node Basics
Section titled “Cache Node Basics”CacheNode wraps expensive operations to store and retrieve results:
cache := petalflow.NewCacheNode("embedding_cache", petalflow.CacheNodeConfig{ Store: petalflow.NewMemoryCacheStore(), TTL: 1 * time.Hour, InputKeys: []string{"text_embedding"}, CacheKey: "embed:{{.Vars.document_id}}",})Configuration Options
Section titled “Configuration Options”| Field | Type | Description |
|---|---|---|
Store | CacheStore | Backend storage (memory, Redis, etc.) |
TTL | time.Duration | Time-to-live for cached entries |
CacheKey | string | Template for generating cache keys |
InputKeys | []string | Envelope keys to cache |
KeyBuilder | func(*Envelope) string | Custom key generation function |
Enabled | bool | Enable/disable caching (default: true) |
RefreshOnHit | bool | Reset TTL on cache hit |
Namespace | string | Key prefix for isolation |
Cache Stores
Section titled “Cache Stores”In-Memory Store
Section titled “In-Memory Store”Fast, single-instance cache for development and simple deployments:
store := petalflow.NewMemoryCacheStore()
// With optionsstore := petalflow.NewMemoryCacheStore( petalflow.WithMaxEntries(10000), petalflow.WithCleanupInterval(5 * time.Minute),)Redis Store
Section titled “Redis Store”Distributed cache for production deployments:
import "github.com/redis/go-redis/v9"
redisClient := redis.NewClient(&redis.Options{ Addr: "localhost:6379", Password: os.Getenv("REDIS_PASSWORD"), DB: 0,})
store := petalflow.NewRedisCacheStore(redisClient)
// With namespacestore := petalflow.NewRedisCacheStore(redisClient, petalflow.WithNamespace("petalflow:cache:"),)Custom Cache Store
Section titled “Custom Cache Store”Implement the CacheStore interface for custom backends:
type CacheStore interface { Get(ctx context.Context, key string) (any, bool, error) Set(ctx context.Context, key string, value any, ttl time.Duration) error Delete(ctx context.Context, key string) error Clear(ctx context.Context) error}
// Example: S3-backed cache for large objectstype S3CacheStore struct { client *s3.Client bucket string}
func (s *S3CacheStore) Get(ctx context.Context, key string) (any, bool, error) { result, err := s.client.GetObject(ctx, &s3.GetObjectInput{ Bucket: aws.String(s.bucket), Key: aws.String(key), }) if err != nil { var notFound *types.NoSuchKey if errors.As(err, ¬Found) { return nil, false, nil } return nil, false, err } defer result.Body.Close()
var value any if err := json.NewDecoder(result.Body).Decode(&value); err != nil { return nil, false, err } return value, true, nil}
func (s *S3CacheStore) Set(ctx context.Context, key string, value any, ttl time.Duration) error { data, err := json.Marshal(value) if err != nil { return err }
_, err = s.client.PutObject(ctx, &s3.PutObjectInput{ Bucket: aws.String(s.bucket), Key: aws.String(key), Body: bytes.NewReader(data), }) return err}Cache Key Strategies
Section titled “Cache Key Strategies”Effective cache keys ensure high hit rates while avoiding stale data.
Template-Based Keys
Section titled “Template-Based Keys”Use Go templates to build keys from envelope data:
// Simple key from single variablecache := petalflow.NewCacheNode("user_cache", petalflow.CacheNodeConfig{ CacheKey: "user:{{.Vars.user_id}}", // ...})
// Composite key from multiple variablescache := petalflow.NewCacheNode("search_cache", petalflow.CacheNodeConfig{ CacheKey: "search:{{.Vars.query}}:{{.Vars.filters}}:{{.Vars.page}}", // ...})
// Include model version for embeddingscache := petalflow.NewCacheNode("embedding_cache", petalflow.CacheNodeConfig{ CacheKey: "embed:{{.Vars.model}}:{{.Vars.document_hash}}", // ...})Custom Key Builder
Section titled “Custom Key Builder”For complex key generation logic:
cache := petalflow.NewCacheNode("smart_cache", petalflow.CacheNodeConfig{ Store: store, TTL: 30 * time.Minute, KeyBuilder: func(env *petalflow.Envelope) string { // Normalize query for better cache hits query := strings.ToLower(strings.TrimSpace(env.GetVarString("query")))
// Hash long queries if len(query) > 100 { hash := sha256.Sum256([]byte(query)) query = hex.EncodeToString(hash[:16]) }
// Include relevant context userID := env.GetVarString("user_id") locale := env.GetVarString("locale")
return fmt.Sprintf("query:%s:%s:%s", userID, locale, query) },})Key Normalization
Section titled “Key Normalization”Improve hit rates by normalizing inputs:
func normalizedKeyBuilder(env *petalflow.Envelope) string { query := env.GetVarString("search_query")
// Lowercase query = strings.ToLower(query)
// Remove extra whitespace query = strings.Join(strings.Fields(query), " ")
// Sort query parameters for consistent ordering params := env.GetVar("filters").(map[string]string) keys := make([]string, 0, len(params)) for k := range params { keys = append(keys, k) } sort.Strings(keys)
var filterParts []string for _, k := range keys { filterParts = append(filterParts, fmt.Sprintf("%s=%s", k, params[k])) }
return fmt.Sprintf("search:%s:%s", query, strings.Join(filterParts, "&"))}TTL Strategies
Section titled “TTL Strategies”Choose TTL based on data characteristics:
| Data Type | Recommended TTL | Rationale |
|---|---|---|
| Embeddings | 24h - 7d | Rarely change for same input |
| Search results | 5m - 1h | Balance freshness vs. cost |
| LLM classifications | 1h - 24h | Stable for same input |
| User preferences | 15m - 1h | May change during session |
| Real-time data | No cache | Always needs fresh data |
Conditional TTL
Section titled “Conditional TTL”Set TTL based on response characteristics:
cache := petalflow.NewCacheNode("adaptive_cache", petalflow.CacheNodeConfig{ Store: store, TTLFunc: func(env *petalflow.Envelope, value any) time.Duration { // Cache successful results longer if result, ok := value.(map[string]any); ok { if result["success"] == true { return 1 * time.Hour } // Cache errors briefly return 1 * time.Minute } return 15 * time.Minute },})Refresh on Hit
Section titled “Refresh on Hit”Keep frequently accessed data warm:
cache := petalflow.NewCacheNode("hot_cache", petalflow.CacheNodeConfig{ Store: store, TTL: 30 * time.Minute, RefreshOnHit: true, // Reset TTL on each access})Cache Patterns
Section titled “Cache Patterns”Cache-Aside Pattern
Section titled “Cache-Aside Pattern”The standard pattern where you check cache before computing:
g := petalflow.NewGraph("cache-aside")
// Check cache firstg.AddNode(petalflow.NewCacheNode("check_cache", petalflow.CacheNodeConfig{ Store: store, CacheKey: "result:{{.Vars.input_hash}}", InputKeys: []string{"cached_result"}, Mode: petalflow.CacheModeRead,}))
// Route based on cache hitg.AddNode(petalflow.NewRuleRouter("cache_check", petalflow.RuleRouterConfig{ Routes: []petalflow.RouteRule{ {When: petalflow.RouteCondition{Var: "cached_result", Op: petalflow.OpNotEmpty}, To: "use_cached"}, }, Default: "compute",}))
// Compute if cache missg.AddNode(petalflow.NewLLMNode("compute", client, computeConfig))
// Write to cache after computeg.AddNode(petalflow.NewCacheNode("write_cache", petalflow.CacheNodeConfig{ Store: store, CacheKey: "result:{{.Vars.input_hash}}", InputKeys: []string{"computed_result"}, Mode: petalflow.CacheModeWrite, TTL: 1 * time.Hour,}))Write-Through Pattern
Section titled “Write-Through Pattern”Cache inline with computation:
g := petalflow.NewGraph("write-through")
// Computeg.AddNode(petalflow.NewLLMNode("generate", client, generateConfig))
// Cache the result (inline, no routing needed)g.AddNode(petalflow.NewCacheNode("cache_result", petalflow.CacheNodeConfig{ Store: store, CacheKey: "generated:{{.Vars.prompt_hash}}", InputKeys: []string{"generated_response"}, TTL: 2 * time.Hour,}))
g.AddEdge("generate", "cache_result")Multi-Level Cache
Section titled “Multi-Level Cache”Combine fast local cache with distributed cache:
memoryStore := petalflow.NewMemoryCacheStore()redisStore := petalflow.NewRedisCacheStore(redisClient)
multiStore := petalflow.NewMultiLevelCacheStore( memoryStore, // L1: fast, local redisStore, // L2: shared, distributed)
cache := petalflow.NewCacheNode("multi_cache", petalflow.CacheNodeConfig{ Store: multiStore, CacheKey: "data:{{.Vars.key}}", InputKeys: []string{"result"}, TTL: 1 * time.Hour,})Cache Invalidation
Section titled “Cache Invalidation”Manual Invalidation
Section titled “Manual Invalidation”Delete specific cache entries:
// In a transform node or custom logicfunc invalidateCache(ctx context.Context, store petalflow.CacheStore, keys []string) error { for _, key := range keys { if err := store.Delete(ctx, key); err != nil { return err } } return nil}
// Invalidate when data changesinvalidator := petalflow.NewTransformNode("invalidate", petalflow.TransformNodeConfig{ Transform: func(inputs map[string]any) (any, error) { userID := inputs["user_id"].(string) keys := []string{ fmt.Sprintf("user:%s:profile", userID), fmt.Sprintf("user:%s:preferences", userID), } return invalidateCache(ctx, store, keys), nil },})Pattern-Based Invalidation
Section titled “Pattern-Based Invalidation”Clear all keys matching a pattern (Redis):
func invalidatePattern(ctx context.Context, client *redis.Client, pattern string) error { iter := client.Scan(ctx, 0, pattern, 100).Iterator() for iter.Next(ctx) { if err := client.Del(ctx, iter.Val()).Err(); err != nil { return err } } return iter.Err()}
// Invalidate all cache for a userinvalidatePattern(ctx, redisClient, "petalflow:cache:user:123:*")Time-Based Invalidation
Section titled “Time-Based Invalidation”Use versioned keys to invalidate entire cache generations:
// Store version in config or databasecacheVersion := "v2"
cache := petalflow.NewCacheNode("versioned_cache", petalflow.CacheNodeConfig{ Store: store, CacheKey: fmt.Sprintf("%s:data:{{.Vars.key}}", cacheVersion), InputKeys: []string{"result"}, TTL: 24 * time.Hour,})
// To invalidate: increment cacheVersion to "v3"// Old keys will expire naturallyConversation Memory
Section titled “Conversation Memory”Maintain conversation context across multiple turns.
Message History in Envelope
Section titled “Message History in Envelope”// Initialize conversationenv := petalflow.NewEnvelope()env.SetMessages([]petalflow.Message{})
// Add system messageenv.AddMessage(petalflow.Message{ Role: "system", Content: "You are a helpful assistant.",})
// Process turnsfor _, userInput := range conversation { env.AddMessage(petalflow.Message{ Role: "user", Content: userInput, })
result, err := runtime.Run(ctx, graph, env, petalflow.RunOptions{}) if err != nil { return err }
// LLMNode automatically adds assistant response to messages assistantResponse := result.GetVar("response").(string)}Sliding Window Memory
Section titled “Sliding Window Memory”Keep only recent messages to manage context length:
memoryManager := petalflow.NewTransformNode("manage_memory", petalflow.TransformNodeConfig{ Transform: func(inputs map[string]any) (any, error) { messages := inputs["messages"].([]petalflow.Message)
maxMessages := 20 if len(messages) > maxMessages { // Keep system message + recent messages systemMsg := messages[0] recentMsgs := messages[len(messages)-(maxMessages-1):] messages = append([]petalflow.Message{systemMsg}, recentMsgs...) }
return messages, nil }, InputKeys: []string{"messages"}, OutputKey: "messages",})Summarization-Based Memory
Section titled “Summarization-Based Memory”Summarize older messages to preserve context while reducing tokens:
g := petalflow.NewGraph("summarized-memory")
// Check if summarization is neededg.AddNode(petalflow.NewRuleRouter("memory_check", petalflow.RuleRouterConfig{ Routes: []petalflow.RouteRule{ {When: petalflow.RouteCondition{Var: "message_count", Op: petalflow.OpGt, Value: 30}, To: "summarize"}, }, Default: "skip_summarize",}))
// Summarize old messagesg.AddNode(petalflow.NewLLMNode("summarize", client, petalflow.LLMNodeConfig{ Model: "gpt-4o-mini", PromptTemplate: `Summarize this conversation history concisely:
{{range .Messages}}{{.Role}}: {{.Content}}{{end}}
Preserve key facts, decisions, and context.`, OutputKey: "conversation_summary",}))
// Replace old messages with summaryg.AddNode(petalflow.NewTransformNode("compact_memory", petalflow.TransformNodeConfig{ Transform: func(inputs map[string]any) (any, error) { summary := inputs["conversation_summary"].(string) recentMessages := inputs["recent_messages"].([]petalflow.Message)
// Create new message history with summary messages := []petalflow.Message{ {Role: "system", Content: "Previous conversation summary: " + summary}, } messages = append(messages, recentMessages...) return messages, nil },}))Persistent Conversation Storage
Section titled “Persistent Conversation Storage”Store conversations for resumption:
type ConversationStore interface { Save(ctx context.Context, sessionID string, env *petalflow.Envelope) error Load(ctx context.Context, sessionID string) (*petalflow.Envelope, error) Delete(ctx context.Context, sessionID string) error}
// Redis-backed conversation storetype RedisConversationStore struct { client *redis.Client ttl time.Duration}
func (s *RedisConversationStore) Save(ctx context.Context, sessionID string, env *petalflow.Envelope) error { data, err := json.Marshal(env.Export()) if err != nil { return err } return s.client.Set(ctx, "conversation:"+sessionID, data, s.ttl).Err()}
func (s *RedisConversationStore) Load(ctx context.Context, sessionID string) (*petalflow.Envelope, error) { data, err := s.client.Get(ctx, "conversation:"+sessionID).Bytes() if err != nil { if err == redis.Nil { return petalflow.NewEnvelope(), nil } return nil, err }
var exported map[string]any if err := json.Unmarshal(data, &exported); err != nil { return nil, err }
return petalflow.ImportEnvelope(exported), nil}Performance Optimization
Section titled “Performance Optimization”Cache Warming
Section titled “Cache Warming”Pre-populate cache for expected requests:
func warmCache(ctx context.Context, store petalflow.CacheStore, items []WarmItem) error { for _, item := range items { if err := store.Set(ctx, item.Key, item.Value, item.TTL); err != nil { return err } } return nil}
// Warm cache on startuppopularQueries := []WarmItem{ {Key: "faq:pricing", Value: pricingResponse, TTL: 24 * time.Hour}, {Key: "faq:features", Value: featuresResponse, TTL: 24 * time.Hour},}warmCache(ctx, store, popularQueries)Cache Metrics
Section titled “Cache Metrics”Monitor cache performance:
type MetricsCacheStore struct { underlying petalflow.CacheStore hits *prometheus.CounterVec misses *prometheus.CounterVec latency *prometheus.HistogramVec}
func (m *MetricsCacheStore) Get(ctx context.Context, key string) (any, bool, error) { start := time.Now() value, found, err := m.underlying.Get(ctx, key)
labels := prometheus.Labels{"operation": "get"} m.latency.With(labels).Observe(time.Since(start).Seconds())
if found { m.hits.With(labels).Inc() } else { m.misses.With(labels).Inc() }
return value, found, err}