Skip to content

RAG Pipeline

This example demonstrates a production-ready retrieval-augmented generation (RAG) pipeline that combines vector search with language models to answer questions grounded in your document corpus.

A complete RAG system that:

  1. Embeds documents using Voyage AI or OpenAI embeddings
  2. Searches a vector database for relevant passages
  3. Reranks results for improved relevance
  4. Synthesizes grounded answers with source citations
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Query │────▶│ Embed │────▶│ Search │────▶│ Rerank │
│ │ │ Query │ │ Vector DB │ │ Results │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
┌─────────────┐ ┌─────────────┐
│ Answer │◀────│ Build │
│ + Cite │ │ Context │
└─────────────┘ └─────────────┘
Terminal window
# Install dependencies
go get github.com/petal-labs/iris
# Set up API keys
iris keys set openai
iris keys set voyageai
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"strings"
"time"
"github.com/petal-labs/iris/core"
"github.com/petal-labs/iris/providers/openai"
"github.com/petal-labs/iris/providers/voyageai"
)
// Document represents a searchable document
type Document struct {
ID string
Title string
Content string
Metadata map[string]string
}
// SearchResult contains a document with its relevance score
type SearchResult struct {
Document Document
Score float64
}
// RAGPipeline orchestrates the retrieval and generation process
type RAGPipeline struct {
embedProvider *voyageai.Provider
chatProvider *openai.Provider
chatClient *core.Client
documents []Document
embeddings map[string][]float64
}
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()
// Initialize the pipeline
pipeline, err := NewRAGPipeline()
if err != nil {
log.Fatal(err)
}
// Load and index documents
documents := loadSampleDocuments()
if err := pipeline.IndexDocuments(ctx, documents); err != nil {
log.Fatal(err)
}
// Answer a question
query := "How do I handle errors in the payment processing system?"
answer, err := pipeline.Answer(ctx, query)
if err != nil {
log.Fatal(err)
}
fmt.Println("Question:", query)
fmt.Println("\nAnswer:", answer.Text)
fmt.Println("\nSources:")
for _, source := range answer.Sources {
fmt.Printf(" - %s (score: %.3f)\n", source.Title, source.Score)
}
}
// NewRAGPipeline creates a new RAG pipeline with configured providers
func NewRAGPipeline() (*RAGPipeline, error) {
// Initialize embedding provider (Voyage AI for quality)
embedProvider, err := voyageai.NewFromKeystore()
if err != nil {
embedProvider, err = voyageai.NewFromEnv()
if err != nil {
return nil, fmt.Errorf("voyage AI not configured: %w", err)
}
}
// Initialize chat provider (OpenAI for generation)
chatProvider, err := openai.NewFromKeystore()
if err != nil {
chatProvider, err = openai.NewFromEnv()
if err != nil {
return nil, fmt.Errorf("openai not configured: %w", err)
}
}
chatClient := core.NewClient(chatProvider,
core.WithRetryPolicy(&core.RetryPolicy{
MaxRetries: 3,
InitialInterval: 1 * time.Second,
MaxInterval: 30 * time.Second,
BackoffMultiplier: 2.0,
RetryOn: []int{429, 500, 503},
}),
)
return &RAGPipeline{
embedProvider: embedProvider,
chatProvider: chatProvider,
chatClient: chatClient,
embeddings: make(map[string][]float64),
}, nil
}
// IndexDocuments embeds and indexes a collection of documents
func (p *RAGPipeline) IndexDocuments(ctx context.Context, docs []Document) error {
p.documents = docs
// Prepare embedding inputs with context for better retrieval
inputs := make([]core.EmbeddingInput, len(docs))
for i, doc := range docs {
inputs[i] = core.EmbeddingInput{
Text: doc.Content,
Context: doc.Title, // Contextualized embedding
}
}
// Generate embeddings in batches
const batchSize = 64
for i := 0; i < len(inputs); i += batchSize {
end := min(i+batchSize, len(inputs))
batch := inputs[i:end]
resp, err := p.embedProvider.Embeddings(ctx, &core.EmbeddingRequest{
Model: "voyage-3-large",
Input: batch,
InputType: core.InputTypeDocument,
})
if err != nil {
return fmt.Errorf("embedding batch %d: %w", i/batchSize, err)
}
for j, emb := range resp.Embeddings {
docIdx := i + j
p.embeddings[docs[docIdx].ID] = emb.Values
}
}
fmt.Printf("Indexed %d documents\n", len(docs))
return nil
}
// Answer generates a grounded response to a query
func (p *RAGPipeline) Answer(ctx context.Context, query string) (*AnswerResult, error) {
// Step 1: Embed the query
queryEmb, err := p.embedProvider.Embeddings(ctx, &core.EmbeddingRequest{
Model: "voyage-3-large",
Input: []core.EmbeddingInput{{Text: query}},
InputType: core.InputTypeQuery,
})
if err != nil {
return nil, fmt.Errorf("embedding query: %w", err)
}
// Step 2: Search for relevant documents
results := p.search(queryEmb.Embeddings[0].Values, 10)
// Step 3: Rerank results for better relevance
reranked, err := p.rerank(ctx, query, results)
if err != nil {
return nil, fmt.Errorf("reranking: %w", err)
}
// Step 4: Build context from top results
contextDocs := reranked[:min(5, len(reranked))]
context := p.buildContext(contextDocs)
// Step 5: Generate grounded answer
answer, err := p.generate(ctx, query, context, contextDocs)
if err != nil {
return nil, fmt.Errorf("generating answer: %w", err)
}
return answer, nil
}
// search performs vector similarity search
func (p *RAGPipeline) search(queryVec []float64, topK int) []SearchResult {
type scored struct {
doc Document
score float64
}
var results []scored
for _, doc := range p.documents {
docVec := p.embeddings[doc.ID]
score := cosineSimilarity(queryVec, docVec)
results = append(results, scored{doc: doc, score: score})
}
// Sort by score descending
for i := 0; i < len(results)-1; i++ {
for j := i + 1; j < len(results); j++ {
if results[j].score > results[i].score {
results[i], results[j] = results[j], results[i]
}
}
}
// Return top K
topK = min(topK, len(results))
searchResults := make([]SearchResult, topK)
for i := 0; i < topK; i++ {
searchResults[i] = SearchResult{
Document: results[i].doc,
Score: results[i].score,
}
}
return searchResults
}
// rerank improves result ordering using a reranking model
func (p *RAGPipeline) rerank(ctx context.Context, query string, results []SearchResult) ([]SearchResult, error) {
if len(results) == 0 {
return results, nil
}
// Extract document texts
docs := make([]string, len(results))
for i, r := range results {
docs[i] = r.Document.Content
}
// Call reranking API
reranked, err := p.embedProvider.Rerank(ctx, &core.RerankRequest{
Model: "rerank-2",
Query: query,
Documents: docs,
TopK: len(docs),
ReturnDocuments: false,
})
if err != nil {
// Fall back to original ordering if reranking fails
return results, nil
}
// Reorder results based on reranking scores
reorderedResults := make([]SearchResult, len(reranked.Results))
for i, r := range reranked.Results {
reorderedResults[i] = SearchResult{
Document: results[r.Index].Document,
Score: r.Score,
}
}
return reorderedResults, nil
}
// buildContext creates a formatted context string from documents
func (p *RAGPipeline) buildContext(results []SearchResult) string {
var sb strings.Builder
for i, r := range results {
sb.WriteString(fmt.Sprintf("[Source %d: %s]\n", i+1, r.Document.Title))
sb.WriteString(r.Document.Content)
sb.WriteString("\n\n")
}
return sb.String()
}
// AnswerResult contains the generated answer with sources
type AnswerResult struct {
Text string
Sources []SourceRef
}
// SourceRef references a source document
type SourceRef struct {
Title string
ID string
Score float64
}
// generate creates a grounded answer using the chat model
func (p *RAGPipeline) generate(ctx context.Context, query, context string, sources []SearchResult) (*AnswerResult, error) {
systemPrompt := `You are a helpful assistant that answers questions based on the provided context.
Rules:
1. Only use information from the provided sources
2. Cite sources using [Source N] format
3. If the context doesn't contain enough information, say so
4. Be concise but thorough`
userPrompt := fmt.Sprintf(`Context:
%s
Question: %s
Provide a comprehensive answer based on the sources above.`, context, query)
resp, err := p.chatClient.Chat("gpt-4o").
System(systemPrompt).
User(userPrompt).
Temperature(0.3).
MaxTokens(1000).
GetResponse(ctx)
if err != nil {
return nil, err
}
// Build source references
sourceRefs := make([]SourceRef, len(sources))
for i, s := range sources {
sourceRefs[i] = SourceRef{
Title: s.Document.Title,
ID: s.Document.ID,
Score: s.Score,
}
}
return &AnswerResult{
Text: resp.Output,
Sources: sourceRefs,
}, nil
}
// cosineSimilarity computes similarity between two vectors
func cosineSimilarity(a, b []float64) float64 {
if len(a) != len(b) {
return 0
}
var dot, normA, normB float64
for i := range a {
dot += a[i] * b[i]
normA += a[i] * a[i]
normB += b[i] * b[i]
}
if normA == 0 || normB == 0 {
return 0
}
return dot / (sqrt(normA) * sqrt(normB))
}
func sqrt(x float64) float64 {
z := x / 2
for i := 0; i < 10; i++ {
z = z - (z*z-x)/(2*z)
}
return z
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
// loadSampleDocuments returns sample documents for demonstration
func loadSampleDocuments() []Document {
return []Document{
{
ID: "doc-001",
Title: "Payment Processing Guide",
Content: `The payment processing system handles all transactions through a multi-step pipeline.
First, payments are validated for correct format and sufficient funds. Then, they're sent to the
payment gateway for authorization. If authorization fails, the system retries up to 3 times with
exponential backoff. Errors are logged to the payments_errors table with full stack traces.
Common error codes: E001 (insufficient funds), E002 (card expired), E003 (gateway timeout).`,
},
{
ID: "doc-002",
Title: "Error Handling Best Practices",
Content: `All services should implement structured error handling. Use typed errors that implement
the error interface. Wrap errors with context using fmt.Errorf with %w verb. Log errors at the
point of origin with full context. For payment errors specifically, always include transaction ID,
amount, and error code. Implement circuit breakers for external service calls to prevent cascade
failures.`,
},
{
ID: "doc-003",
Title: "Retry Policies",
Content: `Retry policies should be configured per-service based on failure characteristics.
For idempotent operations like payment status checks, aggressive retries are safe. For
non-idempotent operations like payment creation, use idempotency keys and conservative retries.
Default retry config: max 3 attempts, initial delay 1s, max delay 30s, multiplier 2.0.
Always implement jitter to prevent thundering herd problems.`,
},
{
ID: "doc-004",
Title: "Monitoring and Alerting",
Content: `Payment system monitoring includes real-time metrics for transaction volume,
success rates, and latency percentiles. Alert thresholds: error rate > 5% triggers P2,
error rate > 15% triggers P1. All payment errors are automatically reported to PagerDuty
with transaction context. Dashboard available at /metrics/payments.`,
},
{
ID: "doc-005",
Title: "Database Schema",
Content: `The payments table stores all transaction records. Key columns: id (UUID),
amount (decimal), currency (varchar), status (enum), created_at (timestamp), updated_at (timestamp).
The payments_errors table stores error details: id, payment_id (FK), error_code, error_message,
stack_trace, created_at. Indexes exist on payment_id and created_at for efficient queries.`,
},
}
}

Voyage AI supports contextualized embeddings that improve retrieval quality:

inputs[i] = core.EmbeddingInput{
Text: doc.Content,
Context: doc.Title, // Adds document context to embedding
}

This helps the model understand that “the function returns an error” in an error handling document is different from the same phrase in an API reference.

Use different input types for queries and documents:

// For documents being indexed
resp, err := provider.Embeddings(ctx, &core.EmbeddingRequest{
Model: "voyage-3-large",
Input: inputs,
InputType: core.InputTypeDocument,
})
// For search queries
resp, err := provider.Embeddings(ctx, &core.EmbeddingRequest{
Model: "voyage-3-large",
Input: []core.EmbeddingInput{{Text: query}},
InputType: core.InputTypeQuery,
})

Reranking dramatically improves result quality by using a cross-encoder model:

reranked, err := p.embedProvider.Rerank(ctx, &core.RerankRequest{
Model: "rerank-2",
Query: query,
Documents: docs,
TopK: len(docs),
ReturnDocuments: false,
})

The generation prompt enforces citation:

systemPrompt := `You are a helpful assistant that answers questions based on the provided context.
Rules:
1. Only use information from the provided sources
2. Cite sources using [Source N] format
3. If the context doesn't contain enough information, say so
4. Be concise but thorough`

For production, replace the in-memory search with a vector database:

// Example with Qdrant
import "github.com/qdrant/go-client/qdrant"
func (p *RAGPipeline) searchQdrant(ctx context.Context, queryVec []float64, topK int) ([]SearchResult, error) {
client, err := qdrant.NewClient(&qdrant.Config{
Host: "localhost",
Port: 6333,
})
if err != nil {
return nil, err
}
results, err := client.Query(ctx, &qdrant.QueryPoints{
CollectionName: "documents",
Query: qdrant.NewQuery(queryVec...),
Limit: qdrant.PtrOf(uint64(topK)),
WithPayload: qdrant.NewWithPayload(true),
})
if err != nil {
return nil, err
}
searchResults := make([]SearchResult, len(results))
for i, r := range results {
searchResults[i] = SearchResult{
Document: Document{
ID: r.Id.GetUuid(),
Title: r.Payload["title"].GetStringValue(),
Content: r.Payload["content"].GetStringValue(),
},
Score: float64(r.Score),
}
}
return searchResults, nil
}

Cache embeddings to avoid redundant API calls:

type EmbeddingCache struct {
cache map[string][]float64
mu sync.RWMutex
}
func (c *EmbeddingCache) GetOrCompute(ctx context.Context, provider *voyageai.Provider, text string) ([]float64, error) {
key := hashText(text)
c.mu.RLock()
if emb, ok := c.cache[key]; ok {
c.mu.RUnlock()
return emb, nil
}
c.mu.RUnlock()
resp, err := provider.Embeddings(ctx, &core.EmbeddingRequest{
Model: "voyage-3-large",
Input: []core.EmbeddingInput{{Text: text}},
})
if err != nil {
return nil, err
}
c.mu.Lock()
c.cache[key] = resp.Embeddings[0].Values
c.mu.Unlock()
return resp.Embeddings[0].Values, nil
}

For long answers, stream the response:

func (p *RAGPipeline) streamAnswer(ctx context.Context, query, context string, out chan<- string) error {
stream, err := p.chatClient.Chat("gpt-4o").
System(systemPrompt).
User(fmt.Sprintf("Context:\n%s\n\nQuestion: %s", context, query)).
GetStream(ctx)
if err != nil {
return err
}
for chunk := range stream.Ch {
out <- chunk.Content
}
return <-stream.Err
}
Question: How do I handle errors in the payment processing system?
Answer: Based on the documentation, payment error handling follows a structured approach:
1. **Error Capture** [Source 1]: The payment system logs errors to the `payments_errors`
table with full stack traces, transaction ID, amount, and error code.
2. **Retry Logic** [Source 3]: For idempotent operations, the system uses aggressive
retries with default config: max 3 attempts, initial delay 1s, max delay 30s,
multiplier 2.0. Always use idempotency keys for payment creation.
3. **Circuit Breakers** [Source 2]: Implement circuit breakers for external service
calls to prevent cascade failures.
4. **Monitoring** [Source 4]: Error rate > 5% triggers P2 alert, > 15% triggers P1.
All errors auto-report to PagerDuty.
Common error codes from [Source 1]: E001 (insufficient funds), E002 (card expired),
E003 (gateway timeout).
Sources:
- Payment Processing Guide (score: 0.892)
- Error Handling Best Practices (score: 0.847)
- Retry Policies (score: 0.823)
- Monitoring and Alerting (score: 0.756)
PracticeRecommendation
Chunk size512-1024 tokens for documents
Overlap10-20% overlap between chunks
Top K retrieval10-20 documents initially
Rerank toTop 3-5 documents for context
Temperature0.0-0.3 for factual answers
Context limitLeave 25% of context window for answer