Batch Embeddings
Scale up document processing. Batch Embeddings →
This example demonstrates a production-ready retrieval-augmented generation (RAG) pipeline that combines vector search with language models to answer questions grounded in your document corpus.
A complete RAG system that:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐│ Query │────▶│ Embed │────▶│ Search │────▶│ Rerank ││ │ │ Query │ │ Vector DB │ │ Results │└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ ▼ ┌─────────────┐ ┌─────────────┐ │ Answer │◀────│ Build │ │ + Cite │ │ Context │ └─────────────┘ └─────────────┘# Install dependenciesgo get github.com/petal-labs/iris
# Set up API keysiris keys set openaiiris keys set voyageaipackage main
import ( "context" "encoding/json" "fmt" "log" "os" "strings" "time"
"github.com/petal-labs/iris/core" "github.com/petal-labs/iris/providers/openai" "github.com/petal-labs/iris/providers/voyageai")
// Document represents a searchable documenttype Document struct { ID string Title string Content string Metadata map[string]string}
// SearchResult contains a document with its relevance scoretype SearchResult struct { Document Document Score float64}
// RAGPipeline orchestrates the retrieval and generation processtype RAGPipeline struct { embedProvider *voyageai.Provider chatProvider *openai.Provider chatClient *core.Client documents []Document embeddings map[string][]float64}
func main() { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) defer cancel()
// Initialize the pipeline pipeline, err := NewRAGPipeline() if err != nil { log.Fatal(err) }
// Load and index documents documents := loadSampleDocuments() if err := pipeline.IndexDocuments(ctx, documents); err != nil { log.Fatal(err) }
// Answer a question query := "How do I handle errors in the payment processing system?" answer, err := pipeline.Answer(ctx, query) if err != nil { log.Fatal(err) }
fmt.Println("Question:", query) fmt.Println("\nAnswer:", answer.Text) fmt.Println("\nSources:") for _, source := range answer.Sources { fmt.Printf(" - %s (score: %.3f)\n", source.Title, source.Score) }}
// NewRAGPipeline creates a new RAG pipeline with configured providersfunc NewRAGPipeline() (*RAGPipeline, error) { // Initialize embedding provider (Voyage AI for quality) embedProvider, err := voyageai.NewFromKeystore() if err != nil { embedProvider, err = voyageai.NewFromEnv() if err != nil { return nil, fmt.Errorf("voyage AI not configured: %w", err) } }
// Initialize chat provider (OpenAI for generation) chatProvider, err := openai.NewFromKeystore() if err != nil { chatProvider, err = openai.NewFromEnv() if err != nil { return nil, fmt.Errorf("openai not configured: %w", err) } }
chatClient := core.NewClient(chatProvider, core.WithRetryPolicy(&core.RetryPolicy{ MaxRetries: 3, InitialInterval: 1 * time.Second, MaxInterval: 30 * time.Second, BackoffMultiplier: 2.0, RetryOn: []int{429, 500, 503}, }), )
return &RAGPipeline{ embedProvider: embedProvider, chatProvider: chatProvider, chatClient: chatClient, embeddings: make(map[string][]float64), }, nil}
// IndexDocuments embeds and indexes a collection of documentsfunc (p *RAGPipeline) IndexDocuments(ctx context.Context, docs []Document) error { p.documents = docs
// Prepare embedding inputs with context for better retrieval inputs := make([]core.EmbeddingInput, len(docs)) for i, doc := range docs { inputs[i] = core.EmbeddingInput{ Text: doc.Content, Context: doc.Title, // Contextualized embedding } }
// Generate embeddings in batches const batchSize = 64 for i := 0; i < len(inputs); i += batchSize { end := min(i+batchSize, len(inputs)) batch := inputs[i:end]
resp, err := p.embedProvider.Embeddings(ctx, &core.EmbeddingRequest{ Model: "voyage-3-large", Input: batch, InputType: core.InputTypeDocument, }) if err != nil { return fmt.Errorf("embedding batch %d: %w", i/batchSize, err) }
for j, emb := range resp.Embeddings { docIdx := i + j p.embeddings[docs[docIdx].ID] = emb.Values } }
fmt.Printf("Indexed %d documents\n", len(docs)) return nil}
// Answer generates a grounded response to a queryfunc (p *RAGPipeline) Answer(ctx context.Context, query string) (*AnswerResult, error) { // Step 1: Embed the query queryEmb, err := p.embedProvider.Embeddings(ctx, &core.EmbeddingRequest{ Model: "voyage-3-large", Input: []core.EmbeddingInput{{Text: query}}, InputType: core.InputTypeQuery, }) if err != nil { return nil, fmt.Errorf("embedding query: %w", err) }
// Step 2: Search for relevant documents results := p.search(queryEmb.Embeddings[0].Values, 10)
// Step 3: Rerank results for better relevance reranked, err := p.rerank(ctx, query, results) if err != nil { return nil, fmt.Errorf("reranking: %w", err) }
// Step 4: Build context from top results contextDocs := reranked[:min(5, len(reranked))] context := p.buildContext(contextDocs)
// Step 5: Generate grounded answer answer, err := p.generate(ctx, query, context, contextDocs) if err != nil { return nil, fmt.Errorf("generating answer: %w", err) }
return answer, nil}
// search performs vector similarity searchfunc (p *RAGPipeline) search(queryVec []float64, topK int) []SearchResult { type scored struct { doc Document score float64 }
var results []scored for _, doc := range p.documents { docVec := p.embeddings[doc.ID] score := cosineSimilarity(queryVec, docVec) results = append(results, scored{doc: doc, score: score}) }
// Sort by score descending for i := 0; i < len(results)-1; i++ { for j := i + 1; j < len(results); j++ { if results[j].score > results[i].score { results[i], results[j] = results[j], results[i] } } }
// Return top K topK = min(topK, len(results)) searchResults := make([]SearchResult, topK) for i := 0; i < topK; i++ { searchResults[i] = SearchResult{ Document: results[i].doc, Score: results[i].score, } } return searchResults}
// rerank improves result ordering using a reranking modelfunc (p *RAGPipeline) rerank(ctx context.Context, query string, results []SearchResult) ([]SearchResult, error) { if len(results) == 0 { return results, nil }
// Extract document texts docs := make([]string, len(results)) for i, r := range results { docs[i] = r.Document.Content }
// Call reranking API reranked, err := p.embedProvider.Rerank(ctx, &core.RerankRequest{ Model: "rerank-2", Query: query, Documents: docs, TopK: len(docs), ReturnDocuments: false, }) if err != nil { // Fall back to original ordering if reranking fails return results, nil }
// Reorder results based on reranking scores reorderedResults := make([]SearchResult, len(reranked.Results)) for i, r := range reranked.Results { reorderedResults[i] = SearchResult{ Document: results[r.Index].Document, Score: r.Score, } }
return reorderedResults, nil}
// buildContext creates a formatted context string from documentsfunc (p *RAGPipeline) buildContext(results []SearchResult) string { var sb strings.Builder for i, r := range results { sb.WriteString(fmt.Sprintf("[Source %d: %s]\n", i+1, r.Document.Title)) sb.WriteString(r.Document.Content) sb.WriteString("\n\n") } return sb.String()}
// AnswerResult contains the generated answer with sourcestype AnswerResult struct { Text string Sources []SourceRef}
// SourceRef references a source documenttype SourceRef struct { Title string ID string Score float64}
// generate creates a grounded answer using the chat modelfunc (p *RAGPipeline) generate(ctx context.Context, query, context string, sources []SearchResult) (*AnswerResult, error) { systemPrompt := `You are a helpful assistant that answers questions based on the provided context.Rules:1. Only use information from the provided sources2. Cite sources using [Source N] format3. If the context doesn't contain enough information, say so4. Be concise but thorough`
userPrompt := fmt.Sprintf(`Context:%s
Question: %s
Provide a comprehensive answer based on the sources above.`, context, query)
resp, err := p.chatClient.Chat("gpt-4o"). System(systemPrompt). User(userPrompt). Temperature(0.3). MaxTokens(1000). GetResponse(ctx)
if err != nil { return nil, err }
// Build source references sourceRefs := make([]SourceRef, len(sources)) for i, s := range sources { sourceRefs[i] = SourceRef{ Title: s.Document.Title, ID: s.Document.ID, Score: s.Score, } }
return &AnswerResult{ Text: resp.Output, Sources: sourceRefs, }, nil}
// cosineSimilarity computes similarity between two vectorsfunc cosineSimilarity(a, b []float64) float64 { if len(a) != len(b) { return 0 }
var dot, normA, normB float64 for i := range a { dot += a[i] * b[i] normA += a[i] * a[i] normB += b[i] * b[i] }
if normA == 0 || normB == 0 { return 0 }
return dot / (sqrt(normA) * sqrt(normB))}
func sqrt(x float64) float64 { z := x / 2 for i := 0; i < 10; i++ { z = z - (z*z-x)/(2*z) } return z}
func min(a, b int) int { if a < b { return a } return b}
// loadSampleDocuments returns sample documents for demonstrationfunc loadSampleDocuments() []Document { return []Document{ { ID: "doc-001", Title: "Payment Processing Guide", Content: `The payment processing system handles all transactions through a multi-step pipeline.First, payments are validated for correct format and sufficient funds. Then, they're sent to thepayment gateway for authorization. If authorization fails, the system retries up to 3 times withexponential backoff. Errors are logged to the payments_errors table with full stack traces.Common error codes: E001 (insufficient funds), E002 (card expired), E003 (gateway timeout).`, }, { ID: "doc-002", Title: "Error Handling Best Practices", Content: `All services should implement structured error handling. Use typed errors that implementthe error interface. Wrap errors with context using fmt.Errorf with %w verb. Log errors at thepoint of origin with full context. For payment errors specifically, always include transaction ID,amount, and error code. Implement circuit breakers for external service calls to prevent cascadefailures.`, }, { ID: "doc-003", Title: "Retry Policies", Content: `Retry policies should be configured per-service based on failure characteristics.For idempotent operations like payment status checks, aggressive retries are safe. Fornon-idempotent operations like payment creation, use idempotency keys and conservative retries.Default retry config: max 3 attempts, initial delay 1s, max delay 30s, multiplier 2.0.Always implement jitter to prevent thundering herd problems.`, }, { ID: "doc-004", Title: "Monitoring and Alerting", Content: `Payment system monitoring includes real-time metrics for transaction volume,success rates, and latency percentiles. Alert thresholds: error rate > 5% triggers P2,error rate > 15% triggers P1. All payment errors are automatically reported to PagerDutywith transaction context. Dashboard available at /metrics/payments.`, }, { ID: "doc-005", Title: "Database Schema", Content: `The payments table stores all transaction records. Key columns: id (UUID),amount (decimal), currency (varchar), status (enum), created_at (timestamp), updated_at (timestamp).The payments_errors table stores error details: id, payment_id (FK), error_code, error_message,stack_trace, created_at. Indexes exist on payment_id and created_at for efficient queries.`, }, }}// Alternative: Use Anthropic Claude for generationimport "github.com/petal-labs/iris/providers/anthropic"
func NewRAGPipelineWithClaude() (*RAGPipeline, error) { embedProvider, err := voyageai.NewFromKeystore() if err != nil { return nil, err }
chatProvider, err := anthropic.NewFromKeystore() if err != nil { return nil, err }
chatClient := core.NewClient(chatProvider)
return &RAGPipeline{ embedProvider: embedProvider, chatProvider: nil, // Using Anthropic instead chatClient: chatClient, embeddings: make(map[string][]float64), }, nil}
// Generate with Claudefunc (p *RAGPipeline) generateWithClaude(ctx context.Context, query, context string, sources []SearchResult) (*AnswerResult, error) { resp, err := p.chatClient.Chat("claude-sonnet-4-20250514"). System("Answer based on the provided context. Cite sources using [Source N] format."). User(fmt.Sprintf("Context:\n%s\n\nQuestion: %s", context, query)). Temperature(0.3). MaxTokens(1000). GetResponse(ctx)
if err != nil { return nil, err }
sourceRefs := make([]SourceRef, len(sources)) for i, s := range sources { sourceRefs[i] = SourceRef{ Title: s.Document.Title, ID: s.Document.ID, Score: s.Score, } }
return &AnswerResult{ Text: resp.Output, Sources: sourceRefs, }, nil}Voyage AI supports contextualized embeddings that improve retrieval quality:
inputs[i] = core.EmbeddingInput{ Text: doc.Content, Context: doc.Title, // Adds document context to embedding}This helps the model understand that “the function returns an error” in an error handling document is different from the same phrase in an API reference.
Use different input types for queries and documents:
// For documents being indexedresp, err := provider.Embeddings(ctx, &core.EmbeddingRequest{ Model: "voyage-3-large", Input: inputs, InputType: core.InputTypeDocument,})
// For search queriesresp, err := provider.Embeddings(ctx, &core.EmbeddingRequest{ Model: "voyage-3-large", Input: []core.EmbeddingInput{{Text: query}}, InputType: core.InputTypeQuery,})Reranking dramatically improves result quality by using a cross-encoder model:
reranked, err := p.embedProvider.Rerank(ctx, &core.RerankRequest{ Model: "rerank-2", Query: query, Documents: docs, TopK: len(docs), ReturnDocuments: false,})The generation prompt enforces citation:
systemPrompt := `You are a helpful assistant that answers questions based on the provided context.Rules:1. Only use information from the provided sources2. Cite sources using [Source N] format3. If the context doesn't contain enough information, say so4. Be concise but thorough`For production, replace the in-memory search with a vector database:
// Example with Qdrantimport "github.com/qdrant/go-client/qdrant"
func (p *RAGPipeline) searchQdrant(ctx context.Context, queryVec []float64, topK int) ([]SearchResult, error) { client, err := qdrant.NewClient(&qdrant.Config{ Host: "localhost", Port: 6333, }) if err != nil { return nil, err }
results, err := client.Query(ctx, &qdrant.QueryPoints{ CollectionName: "documents", Query: qdrant.NewQuery(queryVec...), Limit: qdrant.PtrOf(uint64(topK)), WithPayload: qdrant.NewWithPayload(true), }) if err != nil { return nil, err }
searchResults := make([]SearchResult, len(results)) for i, r := range results { searchResults[i] = SearchResult{ Document: Document{ ID: r.Id.GetUuid(), Title: r.Payload["title"].GetStringValue(), Content: r.Payload["content"].GetStringValue(), }, Score: float64(r.Score), } } return searchResults, nil}Cache embeddings to avoid redundant API calls:
type EmbeddingCache struct { cache map[string][]float64 mu sync.RWMutex}
func (c *EmbeddingCache) GetOrCompute(ctx context.Context, provider *voyageai.Provider, text string) ([]float64, error) { key := hashText(text)
c.mu.RLock() if emb, ok := c.cache[key]; ok { c.mu.RUnlock() return emb, nil } c.mu.RUnlock()
resp, err := provider.Embeddings(ctx, &core.EmbeddingRequest{ Model: "voyage-3-large", Input: []core.EmbeddingInput{{Text: text}}, }) if err != nil { return nil, err }
c.mu.Lock() c.cache[key] = resp.Embeddings[0].Values c.mu.Unlock()
return resp.Embeddings[0].Values, nil}For long answers, stream the response:
func (p *RAGPipeline) streamAnswer(ctx context.Context, query, context string, out chan<- string) error { stream, err := p.chatClient.Chat("gpt-4o"). System(systemPrompt). User(fmt.Sprintf("Context:\n%s\n\nQuestion: %s", context, query)). GetStream(ctx)
if err != nil { return err }
for chunk := range stream.Ch { out <- chunk.Content }
return <-stream.Err}Question: How do I handle errors in the payment processing system?
Answer: Based on the documentation, payment error handling follows a structured approach:
1. **Error Capture** [Source 1]: The payment system logs errors to the `payments_errors` table with full stack traces, transaction ID, amount, and error code.
2. **Retry Logic** [Source 3]: For idempotent operations, the system uses aggressive retries with default config: max 3 attempts, initial delay 1s, max delay 30s, multiplier 2.0. Always use idempotency keys for payment creation.
3. **Circuit Breakers** [Source 2]: Implement circuit breakers for external service calls to prevent cascade failures.
4. **Monitoring** [Source 4]: Error rate > 5% triggers P2 alert, > 15% triggers P1. All errors auto-report to PagerDuty.
Common error codes from [Source 1]: E001 (insufficient funds), E002 (card expired),E003 (gateway timeout).
Sources: - Payment Processing Guide (score: 0.892) - Error Handling Best Practices (score: 0.847) - Retry Policies (score: 0.823) - Monitoring and Alerting (score: 0.756)| Practice | Recommendation |
|---|---|
| Chunk size | 512-1024 tokens for documents |
| Overlap | 10-20% overlap between chunks |
| Top K retrieval | 10-20 documents initially |
| Rerank to | Top 3-5 documents for context |
| Temperature | 0.0-0.3 for factual answers |
| Context limit | Leave 25% of context window for answer |
Batch Embeddings
Scale up document processing. Batch Embeddings →
Voyage AI Provider
Learn more about embeddings. Voyage AI →