RAG Workflow
RAG Workflow
Section titled “RAG Workflow”This example builds a production-ready Retrieval-Augmented Generation (RAG) pipeline that retrieves relevant documents, synthesizes answers, and provides citations.
What You’ll Build
Section titled “What You’ll Build”┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐│ Validate │────▶│ Expand │────▶│ Search │────▶│ Rerank ││ Query │ │ Query │ │ Vectors │ │ Results │└─────────────┘ └─────────────┘ └─────────────┘ └──────┬──────┘ │ ┌──────────────────────────────────────────────┘ ▼ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Filter │────▶│ Synthesize │────▶│ Format │ │ Relevant │ │ Answer │ │ Response │ └─────────────┘ └─────────────┘ └─────────────┘Use Cases
Section titled “Use Cases”- Documentation Q&A: Answer questions using product documentation
- Knowledge base search: Find and synthesize information from internal docs
- Research assistant: Retrieve and summarize academic papers
- Customer support: Ground responses in help articles and FAQs
Complete Implementation
Section titled “Complete Implementation”Setup and Imports
Section titled “Setup and Imports”package main
import ( "context" "encoding/json" "fmt" "log" "os" "time"
"github.com/petal-labs/iris/providers/openai" "github.com/petal-labs/petalflow" "github.com/petal-labs/petalflow/irisadapter")
// Document represents a retrieved documenttype Document struct { ID string `json:"id"` Title string `json:"title"` Content string `json:"content"` URL string `json:"url"` Score float64 `json:"score"` Metadata map[string]string `json:"metadata"`}
// VectorStore interface for document retrievaltype VectorStore interface { Search(ctx context.Context, query string, topK int) ([]Document, error) SearchWithFilter(ctx context.Context, query string, filter map[string]any, topK int) ([]Document, error)}
func main() { // Initialize providers provider := openai.New(os.Getenv("OPENAI_API_KEY")) client := irisadapter.NewProviderAdapter(provider)
// Initialize vector store (e.g., Qdrant, Pinecone, pgvector) vectorStore := initVectorStore()
// Build and run graph := buildRAGGraph(client, vectorStore) runRAGWorkflow(graph)}Building the Graph
Section titled “Building the Graph”func buildRAGGraph(client *irisadapter.ProviderAdapter, vectorStore VectorStore) petalflow.Graph { g := petalflow.NewGraph("rag-pipeline")
// Stage 1: Validate Query validateNode := petalflow.NewGuardianNode("validate_query", petalflow.GuardianNodeConfig{ Checks: []petalflow.GuardCheck{ {Var: "query", Op: petalflow.OpNotEmpty, Message: "Query cannot be empty"}, {Var: "query", Op: petalflow.OpMinLength, Value: 3, Message: "Query too short"}, {Var: "query", Op: petalflow.OpMaxLength, Value: 1000, Message: "Query too long"}, }, OnFail: petalflow.GuardActionReject, })
// Stage 2: Query Expansion (generate alternative phrasings) expandNode := petalflow.NewLLMNode("expand_query", client, petalflow.LLMNodeConfig{ Model: "gpt-4o-mini", SystemPrompt: `Generate 3 alternative phrasings of the user's question to improve search recall.Return as JSON array: ["phrasing1", "phrasing2", "phrasing3"]Keep the same intent but vary vocabulary and structure.`, PromptTemplate: "Original question: {{.Vars.query}}", OutputKey: "expanded_queries", ResponseFormat: petalflow.ResponseFormatJSON, Temperature: 0.7, })
// Stage 3: Vector Search searchNode := petalflow.NewToolNode("search_vectors", petalflow.ToolNodeConfig{ ToolFunc: func(ctx context.Context, env *petalflow.Envelope) error { query := env.GetVarString("query") expandedRaw := env.GetVar("expanded_queries")
// Parse expanded queries var expandedQueries []string if err := json.Unmarshal([]byte(expandedRaw.(string)), &expandedQueries); err != nil { expandedQueries = []string{} // Fall back to original only }
// Search with original + expanded queries allQueries := append([]string{query}, expandedQueries...) var allResults []Document seen := make(map[string]bool)
for _, q := range allQueries { results, err := vectorStore.Search(ctx, q, 5) if err != nil { log.Printf("Search error for '%s': %v", q, err) continue }
for _, doc := range results { if !seen[doc.ID] { seen[doc.ID] = true allResults = append(allResults, doc) } } }
env.SetVar("search_results", allResults) env.SetVar("search_count", len(allResults)) return nil }, Timeout: 10 * time.Second, })
// Stage 4: Rerank Results rerankNode := petalflow.NewLLMNode("rerank_results", client, petalflow.LLMNodeConfig{ Model: "gpt-4o-mini", SystemPrompt: `You are a relevance ranker. Given a query and documents, score each document's relevance from 0-10.Return JSON: [{"id": "doc_id", "score": 8, "reason": "brief reason"}]Only include documents scoring 5 or higher.`, PromptTemplate: `Query: {{.Vars.query}}
Documents:{{range $i, $doc := .Vars.search_results}}[{{$doc.ID}}] {{$doc.Title}}{{$doc.Content}}---{{end}}
Rank these documents by relevance to the query.`, OutputKey: "reranked_results", ResponseFormat: petalflow.ResponseFormatJSON, Temperature: 0.1, })
// Stage 5: Filter to Top Results filterNode := petalflow.NewTransformNode("filter_relevant", petalflow.TransformNodeConfig{ Transform: func(inputs map[string]any) (any, error) { searchResults := inputs["search_results"].([]Document) rerankedRaw := inputs["reranked_results"].(string)
var reranked []struct { ID string `json:"id"` Score float64 `json:"score"` Reason string `json:"reason"` } if err := json.Unmarshal([]byte(rerankedRaw), &reranked); err != nil { // Fall back to original results if len(searchResults) > 5 { return searchResults[:5], nil } return searchResults, nil }
// Build map for quick lookup docMap := make(map[string]Document) for _, doc := range searchResults { docMap[doc.ID] = doc }
// Get top reranked documents var filtered []Document for _, r := range reranked { if r.Score >= 5 && len(filtered) < 5 { if doc, ok := docMap[r.ID]; ok { doc.Score = r.Score // Update with rerank score filtered = append(filtered, doc) } } }
return filtered, nil }, InputKeys: []string{"search_results", "reranked_results"}, OutputKey: "context_documents", })
// Stage 6: Check if we have enough context contextRouter := petalflow.NewRuleRouter("context_check", petalflow.RuleRouterConfig{ Routes: []petalflow.RouteRule{ { When: petalflow.RouteCondition{ Var: "context_documents", Op: petalflow.OpEmpty, }, To: "no_context_response", }, }, Default: "synthesize_answer", })
// No context fallback noContextNode := petalflow.NewTransformNode("no_context_response", petalflow.TransformNodeConfig{ Transform: func(inputs map[string]any) (any, error) { return map[string]any{ "answer": "I couldn't find relevant information to answer your question. Please try rephrasing or ask about a different topic.", "sources": []Document{}, "confidence": "low", }, nil }, OutputKey: "synthesis_result", })
// Stage 7: Synthesize Answer synthesizeNode := petalflow.NewLLMNode("synthesize_answer", client, petalflow.LLMNodeConfig{ Model: "gpt-4o", SystemPrompt: `You are a helpful assistant that answers questions using provided context.
Guidelines:- Only use information from the provided documents- If the context doesn't contain enough information, say so- Cite sources using [1], [2], etc. notation- Be concise but thorough- If documents contradict each other, acknowledge the discrepancy`, PromptTemplate: `Context Documents:{{range $i, $doc := .Vars.context_documents}}[{{add $i 1}}] {{$doc.Title}} ({{$doc.URL}}){{$doc.Content}}
{{end}}
Question: {{.Vars.query}}
Provide a comprehensive answer based on the context above. Include citations.`, OutputKey: "raw_answer", Temperature: 0.3, MaxTokens: 1000, })
// Stage 8: Format Response with Citations formatNode := petalflow.NewTransformNode("format_response", petalflow.TransformNodeConfig{ Transform: func(inputs map[string]any) (any, error) { answer := inputs["raw_answer"].(string) docs := inputs["context_documents"].([]Document)
// Build sources list sources := make([]map[string]any, len(docs)) for i, doc := range docs { sources[i] = map[string]any{ "index": i + 1, "title": doc.Title, "url": doc.URL, "snippet": truncate(doc.Content, 200), } }
return map[string]any{ "answer": answer, "sources": sources, "confidence": calculateConfidence(docs), "query": inputs["query"], }, nil }, InputKeys: []string{"raw_answer", "context_documents", "query"}, OutputKey: "final_response", })
// Add all nodes g.AddNode(validateNode) g.AddNode(expandNode) g.AddNode(searchNode) g.AddNode(rerankNode) g.AddNode(filterNode) g.AddNode(contextRouter) g.AddNode(noContextNode) g.AddNode(synthesizeNode) g.AddNode(formatNode)
// Define edges g.AddEdge("validate_query", "expand_query") g.AddEdge("expand_query", "search_vectors") g.AddEdge("search_vectors", "rerank_results") g.AddEdge("rerank_results", "filter_relevant") g.AddEdge("filter_relevant", "context_check") g.AddEdge("context_check", "no_context_response") g.AddEdge("context_check", "synthesize_answer") g.AddEdge("no_context_response", "format_response") g.AddEdge("synthesize_answer", "format_response")
g.SetEntry("validate_query")
return g}
// Helper functionsfunc truncate(s string, maxLen int) string { if len(s) <= maxLen { return s } return s[:maxLen-3] + "..."}
func calculateConfidence(docs []Document) string { if len(docs) == 0 { return "low" } avgScore := 0.0 for _, doc := range docs { avgScore += doc.Score } avgScore /= float64(len(docs))
if avgScore >= 8 { return "high" } else if avgScore >= 5 { return "medium" } return "low"}Running the Workflow
Section titled “Running the Workflow”func runRAGWorkflow(graph petalflow.Graph) { runtime := petalflow.NewRuntime()
env := petalflow.NewEnvelope() env.SetVar("query", "How do I configure webhook authentication in the API?")
// Event handler for timing var nodeTimings []string handler := func(event petalflow.Event) { if event.Kind == petalflow.EventNodeEnd { nodeTimings = append(nodeTimings, fmt.Sprintf("%s: %v", event.NodeID, event.Duration)) } }
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel()
result, err := runtime.Run(ctx, graph, env, petalflow.RunOptions{ EventHandler: handler, }) if err != nil { log.Fatalf("RAG workflow failed: %v", err) }
// Output response := result.GetVar("final_response").(map[string]any) fmt.Printf("\n=== RAG Response ===\n") fmt.Printf("Query: %s\n\n", response["query"]) fmt.Printf("Answer:\n%s\n\n", response["answer"]) fmt.Printf("Confidence: %s\n\n", response["confidence"]) fmt.Printf("Sources:\n") for _, src := range response["sources"].([]map[string]any) { fmt.Printf(" [%d] %s\n %s\n", src["index"], src["title"], src["url"]) }
fmt.Printf("\nTimings:\n") for _, t := range nodeTimings { fmt.Printf(" %s\n", t) }}Example Output
Section titled “Example Output”=== RAG Response ===Query: How do I configure webhook authentication in the API?
Answer:To configure webhook authentication in the API, you have several options [1]:
1. **HMAC Signature Verification**: Include a secret key in your webhook configuration. The API will sign each payload with HMAC-SHA256, and you can verify the signature using the `X-Webhook-Signature` header [1].
2. **Basic Authentication**: Provide a username and password in your endpoint URL (e.g., `https://user:pass@yourserver.com/webhook`) [2].
3. **Bearer Token**: Configure a bearer token that will be included in the `Authorization` header of each webhook request [1].
For production use, HMAC signature verification is recommended as it providesthe strongest security guarantees [2].
Confidence: high
Sources: [1] Webhook Configuration Guide https://docs.example.com/webhooks/config [2] API Security Best Practices https://docs.example.com/security/best-practices
Timings: validate_query: 1ms expand_query: 287ms search_vectors: 145ms rerank_results: 412ms filter_relevant: 2ms context_check: 1ms synthesize_answer: 1.8s format_response: 1msVariations
Section titled “Variations”Adding Caching
Section titled “Adding Caching”Cache embeddings and frequent queries to reduce latency and cost:
// Cache search resultssearchCache := petalflow.NewCacheNode("cache_search", petalflow.CacheNodeConfig{ Store: redisStore, CacheKey: "rag:search:{{.Vars.query_hash}}", TTL: 1 * time.Hour,})
// Insert between validate and expandg.AddEdge("validate_query", "cache_search")g.AddEdge("cache_search", "expand_query") // On cache missHybrid Search (Vector + Keyword)
Section titled “Hybrid Search (Vector + Keyword)”Combine semantic and keyword search for better recall:
hybridSearchNode := petalflow.NewToolNode("hybrid_search", petalflow.ToolNodeConfig{ ToolFunc: func(ctx context.Context, env *petalflow.Envelope) error { query := env.GetVarString("query")
// Parallel searches var wg sync.WaitGroup var vectorResults, keywordResults []Document
wg.Add(2) go func() { defer wg.Done() vectorResults, _ = vectorStore.Search(ctx, query, 10) }() go func() { defer wg.Done() keywordResults, _ = keywordStore.Search(ctx, query, 10) }() wg.Wait()
// Reciprocal Rank Fusion to combine results combined := reciprocalRankFusion(vectorResults, keywordResults) env.SetVar("search_results", combined) return nil },})Multi-Index Search
Section titled “Multi-Index Search”Search across multiple document collections:
multiIndexNode := petalflow.NewToolNode("multi_index_search", petalflow.ToolNodeConfig{ ToolFunc: func(ctx context.Context, env *petalflow.Envelope) error { query := env.GetVarString("query") indices := []string{"docs", "faqs", "tutorials", "api_reference"}
var allResults []Document for _, idx := range indices { results, err := vectorStore.SearchIndex(ctx, idx, query, 3) if err != nil { continue } for _, doc := range results { doc.Metadata["source_index"] = idx allResults = append(allResults, doc) } }
env.SetVar("search_results", allResults) return nil },})Conversational RAG
Section titled “Conversational RAG”Maintain context across multiple turns:
// Reformulate query with conversation historyreformulateNode := petalflow.NewLLMNode("reformulate_query", client, petalflow.LLMNodeConfig{ Model: "gpt-4o-mini", SystemPrompt: `Given the conversation history, reformulate the latest query to be standalone.Include necessary context from previous turns so the query makes sense on its own.`, PromptTemplate: `Conversation:{{range .Messages}}{{.Role}}: {{.Content}}{{end}}
Latest query: {{.Vars.query}}
Reformulated standalone query:`, OutputKey: "reformulated_query",})
// Use reformulated query for searchg.AddEdge("reformulate_query", "expand_query")