Skip to content

RAG Workflow

This example builds a production-ready Retrieval-Augmented Generation (RAG) pipeline that retrieves relevant documents, synthesizes answers, and provides citations.

┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Validate │────▶│ Expand │────▶│ Search │────▶│ Rerank │
│ Query │ │ Query │ │ Vectors │ │ Results │
└─────────────┘ └─────────────┘ └─────────────┘ └──────┬──────┘
┌──────────────────────────────────────────────┘
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Filter │────▶│ Synthesize │────▶│ Format │
│ Relevant │ │ Answer │ │ Response │
└─────────────┘ └─────────────┘ └─────────────┘
  • Documentation Q&A: Answer questions using product documentation
  • Knowledge base search: Find and synthesize information from internal docs
  • Research assistant: Retrieve and summarize academic papers
  • Customer support: Ground responses in help articles and FAQs
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"time"
"github.com/petal-labs/iris/providers/openai"
"github.com/petal-labs/petalflow"
"github.com/petal-labs/petalflow/irisadapter"
)
// Document represents a retrieved document
type Document struct {
ID string `json:"id"`
Title string `json:"title"`
Content string `json:"content"`
URL string `json:"url"`
Score float64 `json:"score"`
Metadata map[string]string `json:"metadata"`
}
// VectorStore interface for document retrieval
type VectorStore interface {
Search(ctx context.Context, query string, topK int) ([]Document, error)
SearchWithFilter(ctx context.Context, query string, filter map[string]any, topK int) ([]Document, error)
}
func main() {
// Initialize providers
provider := openai.New(os.Getenv("OPENAI_API_KEY"))
client := irisadapter.NewProviderAdapter(provider)
// Initialize vector store (e.g., Qdrant, Pinecone, pgvector)
vectorStore := initVectorStore()
// Build and run
graph := buildRAGGraph(client, vectorStore)
runRAGWorkflow(graph)
}
func buildRAGGraph(client *irisadapter.ProviderAdapter, vectorStore VectorStore) petalflow.Graph {
g := petalflow.NewGraph("rag-pipeline")
// Stage 1: Validate Query
validateNode := petalflow.NewGuardianNode("validate_query", petalflow.GuardianNodeConfig{
Checks: []petalflow.GuardCheck{
{Var: "query", Op: petalflow.OpNotEmpty, Message: "Query cannot be empty"},
{Var: "query", Op: petalflow.OpMinLength, Value: 3, Message: "Query too short"},
{Var: "query", Op: petalflow.OpMaxLength, Value: 1000, Message: "Query too long"},
},
OnFail: petalflow.GuardActionReject,
})
// Stage 2: Query Expansion (generate alternative phrasings)
expandNode := petalflow.NewLLMNode("expand_query", client, petalflow.LLMNodeConfig{
Model: "gpt-4o-mini",
SystemPrompt: `Generate 3 alternative phrasings of the user's question to improve search recall.
Return as JSON array: ["phrasing1", "phrasing2", "phrasing3"]
Keep the same intent but vary vocabulary and structure.`,
PromptTemplate: "Original question: {{.Vars.query}}",
OutputKey: "expanded_queries",
ResponseFormat: petalflow.ResponseFormatJSON,
Temperature: 0.7,
})
// Stage 3: Vector Search
searchNode := petalflow.NewToolNode("search_vectors", petalflow.ToolNodeConfig{
ToolFunc: func(ctx context.Context, env *petalflow.Envelope) error {
query := env.GetVarString("query")
expandedRaw := env.GetVar("expanded_queries")
// Parse expanded queries
var expandedQueries []string
if err := json.Unmarshal([]byte(expandedRaw.(string)), &expandedQueries); err != nil {
expandedQueries = []string{} // Fall back to original only
}
// Search with original + expanded queries
allQueries := append([]string{query}, expandedQueries...)
var allResults []Document
seen := make(map[string]bool)
for _, q := range allQueries {
results, err := vectorStore.Search(ctx, q, 5)
if err != nil {
log.Printf("Search error for '%s': %v", q, err)
continue
}
for _, doc := range results {
if !seen[doc.ID] {
seen[doc.ID] = true
allResults = append(allResults, doc)
}
}
}
env.SetVar("search_results", allResults)
env.SetVar("search_count", len(allResults))
return nil
},
Timeout: 10 * time.Second,
})
// Stage 4: Rerank Results
rerankNode := petalflow.NewLLMNode("rerank_results", client, petalflow.LLMNodeConfig{
Model: "gpt-4o-mini",
SystemPrompt: `You are a relevance ranker. Given a query and documents, score each document's relevance from 0-10.
Return JSON: [{"id": "doc_id", "score": 8, "reason": "brief reason"}]
Only include documents scoring 5 or higher.`,
PromptTemplate: `Query: {{.Vars.query}}
Documents:
{{range $i, $doc := .Vars.search_results}}
[{{$doc.ID}}] {{$doc.Title}}
{{$doc.Content}}
---
{{end}}
Rank these documents by relevance to the query.`,
OutputKey: "reranked_results",
ResponseFormat: petalflow.ResponseFormatJSON,
Temperature: 0.1,
})
// Stage 5: Filter to Top Results
filterNode := petalflow.NewTransformNode("filter_relevant", petalflow.TransformNodeConfig{
Transform: func(inputs map[string]any) (any, error) {
searchResults := inputs["search_results"].([]Document)
rerankedRaw := inputs["reranked_results"].(string)
var reranked []struct {
ID string `json:"id"`
Score float64 `json:"score"`
Reason string `json:"reason"`
}
if err := json.Unmarshal([]byte(rerankedRaw), &reranked); err != nil {
// Fall back to original results
if len(searchResults) > 5 {
return searchResults[:5], nil
}
return searchResults, nil
}
// Build map for quick lookup
docMap := make(map[string]Document)
for _, doc := range searchResults {
docMap[doc.ID] = doc
}
// Get top reranked documents
var filtered []Document
for _, r := range reranked {
if r.Score >= 5 && len(filtered) < 5 {
if doc, ok := docMap[r.ID]; ok {
doc.Score = r.Score // Update with rerank score
filtered = append(filtered, doc)
}
}
}
return filtered, nil
},
InputKeys: []string{"search_results", "reranked_results"},
OutputKey: "context_documents",
})
// Stage 6: Check if we have enough context
contextRouter := petalflow.NewRuleRouter("context_check", petalflow.RuleRouterConfig{
Routes: []petalflow.RouteRule{
{
When: petalflow.RouteCondition{
Var: "context_documents",
Op: petalflow.OpEmpty,
},
To: "no_context_response",
},
},
Default: "synthesize_answer",
})
// No context fallback
noContextNode := petalflow.NewTransformNode("no_context_response", petalflow.TransformNodeConfig{
Transform: func(inputs map[string]any) (any, error) {
return map[string]any{
"answer": "I couldn't find relevant information to answer your question. Please try rephrasing or ask about a different topic.",
"sources": []Document{},
"confidence": "low",
}, nil
},
OutputKey: "synthesis_result",
})
// Stage 7: Synthesize Answer
synthesizeNode := petalflow.NewLLMNode("synthesize_answer", client, petalflow.LLMNodeConfig{
Model: "gpt-4o",
SystemPrompt: `You are a helpful assistant that answers questions using provided context.
Guidelines:
- Only use information from the provided documents
- If the context doesn't contain enough information, say so
- Cite sources using [1], [2], etc. notation
- Be concise but thorough
- If documents contradict each other, acknowledge the discrepancy`,
PromptTemplate: `Context Documents:
{{range $i, $doc := .Vars.context_documents}}
[{{add $i 1}}] {{$doc.Title}} ({{$doc.URL}})
{{$doc.Content}}
{{end}}
Question: {{.Vars.query}}
Provide a comprehensive answer based on the context above. Include citations.`,
OutputKey: "raw_answer",
Temperature: 0.3,
MaxTokens: 1000,
})
// Stage 8: Format Response with Citations
formatNode := petalflow.NewTransformNode("format_response", petalflow.TransformNodeConfig{
Transform: func(inputs map[string]any) (any, error) {
answer := inputs["raw_answer"].(string)
docs := inputs["context_documents"].([]Document)
// Build sources list
sources := make([]map[string]any, len(docs))
for i, doc := range docs {
sources[i] = map[string]any{
"index": i + 1,
"title": doc.Title,
"url": doc.URL,
"snippet": truncate(doc.Content, 200),
}
}
return map[string]any{
"answer": answer,
"sources": sources,
"confidence": calculateConfidence(docs),
"query": inputs["query"],
}, nil
},
InputKeys: []string{"raw_answer", "context_documents", "query"},
OutputKey: "final_response",
})
// Add all nodes
g.AddNode(validateNode)
g.AddNode(expandNode)
g.AddNode(searchNode)
g.AddNode(rerankNode)
g.AddNode(filterNode)
g.AddNode(contextRouter)
g.AddNode(noContextNode)
g.AddNode(synthesizeNode)
g.AddNode(formatNode)
// Define edges
g.AddEdge("validate_query", "expand_query")
g.AddEdge("expand_query", "search_vectors")
g.AddEdge("search_vectors", "rerank_results")
g.AddEdge("rerank_results", "filter_relevant")
g.AddEdge("filter_relevant", "context_check")
g.AddEdge("context_check", "no_context_response")
g.AddEdge("context_check", "synthesize_answer")
g.AddEdge("no_context_response", "format_response")
g.AddEdge("synthesize_answer", "format_response")
g.SetEntry("validate_query")
return g
}
// Helper functions
func truncate(s string, maxLen int) string {
if len(s) <= maxLen {
return s
}
return s[:maxLen-3] + "..."
}
func calculateConfidence(docs []Document) string {
if len(docs) == 0 {
return "low"
}
avgScore := 0.0
for _, doc := range docs {
avgScore += doc.Score
}
avgScore /= float64(len(docs))
if avgScore >= 8 {
return "high"
} else if avgScore >= 5 {
return "medium"
}
return "low"
}
func runRAGWorkflow(graph petalflow.Graph) {
runtime := petalflow.NewRuntime()
env := petalflow.NewEnvelope()
env.SetVar("query", "How do I configure webhook authentication in the API?")
// Event handler for timing
var nodeTimings []string
handler := func(event petalflow.Event) {
if event.Kind == petalflow.EventNodeEnd {
nodeTimings = append(nodeTimings, fmt.Sprintf("%s: %v", event.NodeID, event.Duration))
}
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
result, err := runtime.Run(ctx, graph, env, petalflow.RunOptions{
EventHandler: handler,
})
if err != nil {
log.Fatalf("RAG workflow failed: %v", err)
}
// Output
response := result.GetVar("final_response").(map[string]any)
fmt.Printf("\n=== RAG Response ===\n")
fmt.Printf("Query: %s\n\n", response["query"])
fmt.Printf("Answer:\n%s\n\n", response["answer"])
fmt.Printf("Confidence: %s\n\n", response["confidence"])
fmt.Printf("Sources:\n")
for _, src := range response["sources"].([]map[string]any) {
fmt.Printf(" [%d] %s\n %s\n", src["index"], src["title"], src["url"])
}
fmt.Printf("\nTimings:\n")
for _, t := range nodeTimings {
fmt.Printf(" %s\n", t)
}
}
=== RAG Response ===
Query: How do I configure webhook authentication in the API?
Answer:
To configure webhook authentication in the API, you have several options [1]:
1. **HMAC Signature Verification**: Include a secret key in your webhook configuration.
The API will sign each payload with HMAC-SHA256, and you can verify the signature
using the `X-Webhook-Signature` header [1].
2. **Basic Authentication**: Provide a username and password in your endpoint URL
(e.g., `https://user:pass@yourserver.com/webhook`) [2].
3. **Bearer Token**: Configure a bearer token that will be included in the
`Authorization` header of each webhook request [1].
For production use, HMAC signature verification is recommended as it provides
the strongest security guarantees [2].
Confidence: high
Sources:
[1] Webhook Configuration Guide
https://docs.example.com/webhooks/config
[2] API Security Best Practices
https://docs.example.com/security/best-practices
Timings:
validate_query: 1ms
expand_query: 287ms
search_vectors: 145ms
rerank_results: 412ms
filter_relevant: 2ms
context_check: 1ms
synthesize_answer: 1.8s
format_response: 1ms

Cache embeddings and frequent queries to reduce latency and cost:

// Cache search results
searchCache := petalflow.NewCacheNode("cache_search", petalflow.CacheNodeConfig{
Store: redisStore,
CacheKey: "rag:search:{{.Vars.query_hash}}",
TTL: 1 * time.Hour,
})
// Insert between validate and expand
g.AddEdge("validate_query", "cache_search")
g.AddEdge("cache_search", "expand_query") // On cache miss

Combine semantic and keyword search for better recall:

hybridSearchNode := petalflow.NewToolNode("hybrid_search", petalflow.ToolNodeConfig{
ToolFunc: func(ctx context.Context, env *petalflow.Envelope) error {
query := env.GetVarString("query")
// Parallel searches
var wg sync.WaitGroup
var vectorResults, keywordResults []Document
wg.Add(2)
go func() {
defer wg.Done()
vectorResults, _ = vectorStore.Search(ctx, query, 10)
}()
go func() {
defer wg.Done()
keywordResults, _ = keywordStore.Search(ctx, query, 10)
}()
wg.Wait()
// Reciprocal Rank Fusion to combine results
combined := reciprocalRankFusion(vectorResults, keywordResults)
env.SetVar("search_results", combined)
return nil
},
})

Search across multiple document collections:

multiIndexNode := petalflow.NewToolNode("multi_index_search", petalflow.ToolNodeConfig{
ToolFunc: func(ctx context.Context, env *petalflow.Envelope) error {
query := env.GetVarString("query")
indices := []string{"docs", "faqs", "tutorials", "api_reference"}
var allResults []Document
for _, idx := range indices {
results, err := vectorStore.SearchIndex(ctx, idx, query, 3)
if err != nil {
continue
}
for _, doc := range results {
doc.Metadata["source_index"] = idx
allResults = append(allResults, doc)
}
}
env.SetVar("search_results", allResults)
return nil
},
})

Maintain context across multiple turns:

// Reformulate query with conversation history
reformulateNode := petalflow.NewLLMNode("reformulate_query", client, petalflow.LLMNodeConfig{
Model: "gpt-4o-mini",
SystemPrompt: `Given the conversation history, reformulate the latest query to be standalone.
Include necessary context from previous turns so the query makes sense on its own.`,
PromptTemplate: `Conversation:
{{range .Messages}}{{.Role}}: {{.Content}}
{{end}}
Latest query: {{.Vars.query}}
Reformulated standalone query:`,
OutputKey: "reformulated_query",
})
// Use reformulated query for search
g.AddEdge("reformulate_query", "expand_query")