RAG Pipeline
Use these embeddings for retrieval. RAG Pipeline →
This example demonstrates production patterns for generating embeddings at scale. You’ll learn how to process large document collections with batching, rate limiting, progress tracking, checkpointing, and incremental persistence to vector stores.
A batch embedding pipeline that:
package main
import ( "context" "encoding/json" "fmt" "log" "os" "os/signal" "sync" "syscall" "time"
"github.com/petal-labs/iris/core" "github.com/petal-labs/iris/providers/voyageai")
// Document represents a source document to be embedded.type Document struct { ID string `json:"id"` Content string `json:"content"` Metadata map[string]string `json:"metadata,omitempty"`}
// EmbeddedDocument contains a document with its embedding vector.type EmbeddedDocument struct { Document Embedding []float32 `json:"embedding"` Model string `json:"model"` CreatedAt time.Time `json:"created_at"`}
// BatchConfig controls the embedding pipeline behavior.type BatchConfig struct { BatchSize int // Documents per batch (default: 128) MaxConcurrency int // Parallel batch workers (default: 3) RequestsPerMin int // Rate limit (default: 300 for Voyage AI) RetryAttempts int // Retries per batch (default: 3) RetryDelay time.Duration // Initial retry delay (default: 1s) CheckpointEvery int // Save progress every N documents (default: 1000)}
// DefaultBatchConfig returns sensible defaults for production use.func DefaultBatchConfig() BatchConfig { return BatchConfig{ BatchSize: 128, MaxConcurrency: 3, RequestsPerMin: 300, RetryAttempts: 3, RetryDelay: time.Second, CheckpointEvery: 1000, }}
// ProgressCallback receives updates during batch processing.type ProgressCallback func(processed, total int, lastBatchDuration time.Duration)
// VectorStore defines the interface for persisting embeddings.type VectorStore interface { // Upsert stores or updates embedded documents. Upsert(ctx context.Context, docs []EmbeddedDocument) error // Close releases any resources. Close() error}
// BatchEmbedder processes documents in batches with rate limiting.type BatchEmbedder struct { provider core.EmbeddingProvider model string config BatchConfig store VectorStore onProgress ProgressCallback
// Rate limiting rateLimiter *time.Ticker rateLimit chan struct{}
// Checkpointing checkpointFile string checkpoint *Checkpoint mu sync.Mutex}
// Checkpoint tracks progress for resumable processing.type Checkpoint struct { ProcessedIDs map[string]bool `json:"processed_ids"` LastUpdated time.Time `json:"last_updated"` TotalCount int `json:"total_count"`}
// NewBatchEmbedder creates a new batch processor.func NewBatchEmbedder(provider core.EmbeddingProvider, model string, config BatchConfig) *BatchEmbedder { // Calculate rate limit interval interval := time.Minute / time.Duration(config.RequestsPerMin)
be := &BatchEmbedder{ provider: provider, model: model, config: config, rateLimiter: time.NewTicker(interval), rateLimit: make(chan struct{}, config.MaxConcurrency), checkpoint: &Checkpoint{ProcessedIDs: make(map[string]bool)}, }
return be}
// WithStore sets the vector store for persistence.func (be *BatchEmbedder) WithStore(store VectorStore) *BatchEmbedder { be.store = store return be}
// WithProgress sets a progress callback.func (be *BatchEmbedder) WithProgress(cb ProgressCallback) *BatchEmbedder { be.onProgress = cb return be}
// WithCheckpoint enables resumable processing from a file.func (be *BatchEmbedder) WithCheckpoint(filepath string) *BatchEmbedder { be.checkpointFile = filepath be.loadCheckpoint() return be}
func (be *BatchEmbedder) loadCheckpoint() { data, err := os.ReadFile(be.checkpointFile) if err != nil { return // No checkpoint exists } json.Unmarshal(data, be.checkpoint)}
func (be *BatchEmbedder) saveCheckpoint() error { be.mu.Lock() defer be.mu.Unlock()
be.checkpoint.LastUpdated = time.Now() data, err := json.MarshalIndent(be.checkpoint, "", " ") if err != nil { return err } return os.WriteFile(be.checkpointFile, data, 0644)}
// Process embeds all documents and stores them.func (be *BatchEmbedder) Process(ctx context.Context, docs []Document) error { // Filter already-processed documents var pending []Document for _, doc := range docs { if !be.checkpoint.ProcessedIDs[doc.ID] { pending = append(pending, doc) } }
if len(pending) == 0 { log.Println("All documents already processed") return nil }
be.checkpoint.TotalCount = len(docs) total := len(pending) processed := len(docs) - total
log.Printf("Processing %d documents (%d already complete)", total, processed)
// Create batches var batches [][]Document for i := 0; i < len(pending); i += be.config.BatchSize { end := i + be.config.BatchSize if end > len(pending) { end = len(pending) } batches = append(batches, pending[i:end]) }
// Process with concurrency control results := make(chan []EmbeddedDocument, len(batches)) errors := make(chan error, len(batches))
var wg sync.WaitGroup semaphore := make(chan struct{}, be.config.MaxConcurrency)
for batchNum, batch := range batches { select { case <-ctx.Done(): return ctx.Err() default: }
wg.Add(1) go func(num int, docs []Document) { defer wg.Done()
semaphore <- struct{}{} // Acquire defer func() { <-semaphore }() // Release
// Wait for rate limit <-be.rateLimiter.C
start := time.Now() embedded, err := be.processBatch(ctx, docs) duration := time.Since(start)
if err != nil { errors <- fmt.Errorf("batch %d: %w", num, err) return }
results <- embedded
// Update progress be.mu.Lock() for _, doc := range embedded { be.checkpoint.ProcessedIDs[doc.ID] = true } processed += len(embedded) be.mu.Unlock()
if be.onProgress != nil { be.onProgress(processed, len(docs), duration) }
// Periodic checkpoint if processed%be.config.CheckpointEvery == 0 && be.checkpointFile != "" { be.saveCheckpoint() } }(batchNum, batch) }
// Collect results in background go func() { wg.Wait() close(results) close(errors) }()
// Store results as they arrive var allEmbedded []EmbeddedDocument for embedded := range results { allEmbedded = append(allEmbedded, embedded...)
// Incremental persistence if be.store != nil && len(allEmbedded) >= be.config.BatchSize*2 { if err := be.store.Upsert(ctx, allEmbedded); err != nil { log.Printf("Warning: store upsert failed: %v", err) } else { allEmbedded = nil // Clear after successful store } } }
// Store remaining if be.store != nil && len(allEmbedded) > 0 { if err := be.store.Upsert(ctx, allEmbedded); err != nil { return fmt.Errorf("final store upsert: %w", err) } }
// Check for errors var errs []error for err := range errors { errs = append(errs, err) } if len(errs) > 0 { return fmt.Errorf("batch processing errors: %v", errs) }
// Final checkpoint if be.checkpointFile != "" { be.saveCheckpoint() }
return nil}
func (be *BatchEmbedder) processBatch(ctx context.Context, docs []Document) ([]EmbeddedDocument, error) { var lastErr error
for attempt := 0; attempt < be.config.RetryAttempts; attempt++ { if attempt > 0 { delay := be.config.RetryDelay * time.Duration(1<<attempt) select { case <-ctx.Done(): return nil, ctx.Err() case <-time.After(delay): } }
embedded, err := be.embedBatch(ctx, docs) if err == nil { return embedded, nil }
lastErr = err
// Check if error is retryable if apiErr, ok := err.(*core.APIError); ok { if apiErr.StatusCode == 429 { // Rate limited - wait longer time.Sleep(time.Duration(attempt+1) * 5 * time.Second) continue } if apiErr.StatusCode >= 500 { continue // Server error, retry } return nil, err // Client error, don't retry } }
return nil, fmt.Errorf("after %d attempts: %w", be.config.RetryAttempts, lastErr)}
func (be *BatchEmbedder) embedBatch(ctx context.Context, docs []Document) ([]EmbeddedDocument, error) { // Build embedding inputs inputs := make([]core.EmbeddingInput, len(docs)) for i, doc := range docs { inputs[i] = core.EmbeddingInput{ Text: doc.Content, } }
req := &core.EmbeddingRequest{ Model: be.model, Input: inputs, InputType: core.InputTypeDocument, }
resp, err := be.provider.Embeddings(ctx, req) if err != nil { return nil, err }
if len(resp.Embeddings) != len(docs) { return nil, fmt.Errorf("embedding count mismatch: got %d, expected %d", len(resp.Embeddings), len(docs)) }
// Build embedded documents embedded := make([]EmbeddedDocument, len(docs)) for i, doc := range docs { embedded[i] = EmbeddedDocument{ Document: doc, Embedding: resp.Embeddings[i].Values, Model: be.model, CreatedAt: time.Now(), } }
return embedded, nil}
// Close releases resources.func (be *BatchEmbedder) Close() { be.rateLimiter.Stop() if be.store != nil { be.store.Close() }}
func main() { // Initialize provider from keystore provider, err := voyageai.NewFromKeystore() if err != nil { provider, err = voyageai.NewFromEnv() if err != nil { log.Fatal("No Voyage AI key. Run: iris keys set voyageai") } }
// Load documents (example: from JSON file) docs, err := loadDocuments("documents.json") if err != nil { log.Fatalf("Failed to load documents: %v", err) }
// Configure batch processing config := DefaultBatchConfig() config.BatchSize = 128 // Voyage AI supports up to 128 config.MaxConcurrency = 3 // Parallel batches config.RequestsPerMin = 300 // Stay under rate limit
// Create embedder with Qdrant storage store, err := NewQdrantStore("http://localhost:6333", "documents") if err != nil { log.Fatalf("Failed to connect to Qdrant: %v", err) }
embedder := NewBatchEmbedder(provider, "voyage-3-large", config). WithStore(store). WithCheckpoint("embedding_progress.json"). WithProgress(func(processed, total int, duration time.Duration) { pct := float64(processed) / float64(total) * 100 log.Printf("Progress: %d/%d (%.1f%%) - last batch: %v", processed, total, pct, duration) }) defer embedder.Close()
// Handle graceful shutdown ctx, cancel := context.WithCancel(context.Background()) defer cancel()
sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigCh log.Println("Shutting down gracefully...") cancel() }()
// Process all documents start := time.Now() if err := embedder.Process(ctx, docs); err != nil { if ctx.Err() != nil { log.Println("Processing interrupted - progress saved to checkpoint") } else { log.Fatalf("Processing failed: %v", err) } }
log.Printf("Completed in %v", time.Since(start))}
func loadDocuments(filepath string) ([]Document, error) { data, err := os.ReadFile(filepath) if err != nil { return nil, err }
var docs []Document if err := json.Unmarshal(data, &docs); err != nil { return nil, err }
return docs, nil}import ( "context" "fmt"
"github.com/qdrant/go-client/qdrant")
type QdrantStore struct { client *qdrant.Client collection string}
func NewQdrantStore(url, collection string) (*QdrantStore, error) { client, err := qdrant.NewClient(&qdrant.Config{ Host: url, }) if err != nil { return nil, fmt.Errorf("connect to Qdrant: %w", err) }
return &QdrantStore{ client: client, collection: collection, }, nil}
func (s *QdrantStore) Upsert(ctx context.Context, docs []EmbeddedDocument) error { points := make([]*qdrant.PointStruct, len(docs))
for i, doc := range docs { // Convert []float32 to []float64 for Qdrant vector := make([]float64, len(doc.Embedding)) for j, v := range doc.Embedding { vector[j] = float64(v) }
payload := map[string]interface{}{ "content": doc.Content, "metadata": doc.Metadata, "model": doc.Model, "created_at": doc.CreatedAt.Format(time.RFC3339), }
points[i] = &qdrant.PointStruct{ Id: qdrant.NewIDUUID(doc.ID), Vectors: qdrant.NewVectorsDense(vector), Payload: qdrant.NewValueMap(payload), } }
_, err := s.client.Upsert(ctx, &qdrant.UpsertPoints{ CollectionName: s.collection, Points: points, })
return err}
func (s *QdrantStore) Close() error { return s.client.Close()}import ( "context" "database/sql" "fmt"
"github.com/lib/pq" _ "github.com/lib/pq")
type PgVectorStore struct { db *sql.DB tableName string}
func NewPgVectorStore(connStr, tableName string) (*PgVectorStore, error) { db, err := sql.Open("postgres", connStr) if err != nil { return nil, fmt.Errorf("connect to PostgreSQL: %w", err) }
// Ensure pgvector extension exists _, err = db.Exec("CREATE EXTENSION IF NOT EXISTS vector") if err != nil { return nil, fmt.Errorf("create vector extension: %w", err) }
return &PgVectorStore{ db: db, tableName: tableName, }, nil}
func (s *PgVectorStore) Upsert(ctx context.Context, docs []EmbeddedDocument) error { tx, err := s.db.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback()
stmt, err := tx.PrepareContext(ctx, fmt.Sprintf(` INSERT INTO %s (id, content, embedding, metadata, model, created_at) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (id) DO UPDATE SET content = EXCLUDED.content, embedding = EXCLUDED.embedding, metadata = EXCLUDED.metadata, model = EXCLUDED.model, created_at = EXCLUDED.created_at `, s.tableName)) if err != nil { return err } defer stmt.Close()
for _, doc := range docs { // Convert embedding to PostgreSQL vector format vectorStr := fmt.Sprintf("[%s]", floatsToString(doc.Embedding))
_, err := stmt.ExecContext(ctx, doc.ID, doc.Content, vectorStr, doc.Metadata, doc.Model, doc.CreatedAt, ) if err != nil { return fmt.Errorf("insert document %s: %w", doc.ID, err) } }
return tx.Commit()}
func floatsToString(vals []float32) string { result := "" for i, v := range vals { if i > 0 { result += "," } result += fmt.Sprintf("%f", v) } return result}
func (s *PgVectorStore) Close() error { return s.db.Close()}Voyage AI provides state-of-the-art embeddings optimized for retrieval.
import "github.com/petal-labs/iris/providers/voyageai"
provider, err := voyageai.NewFromKeystore()
// Use voyage-3-large for best qualityembedder := NewBatchEmbedder(provider, "voyage-3-large", config)
// Or voyage-3-lite for faster processingembedder := NewBatchEmbedder(provider, "voyage-3-lite", config)Models:
| Model | Dimensions | Best For |
|---|---|---|
| voyage-3-large | 1024 | Maximum retrieval quality |
| voyage-3 | 1024 | Balanced performance |
| voyage-3-lite | 512 | Cost-efficient processing |
| voyage-code-3 | 1024 | Code and technical content |
OpenAI’s text-embedding-3 models offer excellent quality with dimensional flexibility.
import "github.com/petal-labs/iris/providers/openai"
provider, err := openai.NewFromKeystore()
// Use text-embedding-3-large for best qualityembedder := NewBatchEmbedder(provider, "text-embedding-3-large", config)
// Or text-embedding-3-small for efficiencyembedder := NewBatchEmbedder(provider, "text-embedding-3-small", config)Models:
| Model | Dimensions | Best For |
|---|---|---|
| text-embedding-3-large | 3072 (or custom) | Maximum quality |
| text-embedding-3-small | 1536 (or custom) | Cost-efficient |
Dimension reduction:
// Reduce dimensions for storage efficiencyreq := &core.EmbeddingRequest{ Model: "text-embedding-3-large", Input: inputs, Dimensions: 1024, // Reduce from 3072}Google’s text-embedding models for document processing.
import "github.com/petal-labs/iris/providers/gemini"
provider, err := gemini.NewFromKeystore()
embedder := NewBatchEmbedder(provider, "text-embedding-004", config)Models:
| Model | Dimensions | Best For |
|---|---|---|
| text-embedding-004 | 768 | General purpose |
| text-multilingual-embedding-002 | 768 | Multilingual content |
For improved retrieval, add document context:
func (be *BatchEmbedder) embedBatchWithContext(ctx context.Context, docs []Document) ([]EmbeddedDocument, error) { inputs := make([]core.EmbeddingInput, len(docs)) for i, doc := range docs { inputs[i] = core.EmbeddingInput{ Text: doc.Content, Context: doc.Metadata["title"], // Add title as context } }
req := &core.EmbeddingRequest{ Model: be.model, Input: inputs, InputType: core.InputTypeDocument, }
resp, err := be.provider.Embeddings(ctx, req) if err != nil { return nil, err }
// ... rest of processing}For long documents, chunk before embedding:
type Chunker struct { ChunkSize int ChunkOverlap int}
func (c *Chunker) Chunk(doc Document) []Document { content := doc.Content var chunks []Document
for i := 0; i < len(content); i += c.ChunkSize - c.ChunkOverlap { end := i + c.ChunkSize if end > len(content) { end = len(content) }
chunk := Document{ ID: fmt.Sprintf("%s_chunk_%d", doc.ID, len(chunks)), Content: content[i:end], Metadata: map[string]string{ "parent_id": doc.ID, "chunk_index": fmt.Sprintf("%d", len(chunks)), }, }
// Copy parent metadata for k, v := range doc.Metadata { if _, exists := chunk.Metadata[k]; !exists { chunk.Metadata[k] = v } }
chunks = append(chunks, chunk)
if end >= len(content) { break } }
return chunks}
// Usagechunker := &Chunker{ChunkSize: 512, ChunkOverlap: 50}
var allChunks []Documentfor _, doc := range docs { chunks := chunker.Chunk(doc) allChunks = append(allChunks, chunks...)}
embedder.Process(ctx, allChunks)type ProgressTracker struct { startTime time.Time batchTimes []time.Duration processed int total int}
func (t *ProgressTracker) Update(processed, total int, batchDuration time.Duration) { t.processed = processed t.total = total t.batchTimes = append(t.batchTimes, batchDuration)}
func (t *ProgressTracker) ETA() time.Duration { if len(t.batchTimes) == 0 || t.processed == 0 { return 0 }
// Calculate average batch time var total time.Duration for _, d := range t.batchTimes { total += d } avgBatch := total / time.Duration(len(t.batchTimes))
remaining := t.total - t.processed batchesRemaining := remaining / 128 // Assuming batch size of 128 if remaining%128 > 0 { batchesRemaining++ }
return avgBatch * time.Duration(batchesRemaining)}
func (t *ProgressTracker) String() string { pct := float64(t.processed) / float64(t.total) * 100 eta := t.ETA() return fmt.Sprintf("Progress: %d/%d (%.1f%%) - ETA: %v", t.processed, t.total, pct, eta.Round(time.Second))}Avoid re-embedding identical content:
import "crypto/sha256"
type DeduplicatingStore struct { inner VectorStore contentMap map[string]string // content hash -> document ID}
func (s *DeduplicatingStore) Upsert(ctx context.Context, docs []EmbeddedDocument) error { var unique []EmbeddedDocument
for _, doc := range docs { hash := sha256.Sum256([]byte(doc.Content)) hashStr := fmt.Sprintf("%x", hash)
if existingID, exists := s.contentMap[hashStr]; exists { log.Printf("Skipping duplicate: %s (same as %s)", doc.ID, existingID) continue }
s.contentMap[hashStr] = doc.ID unique = append(unique, doc) }
if len(unique) == 0 { return nil }
return s.inner.Upsert(ctx, unique)}For very large batches, process documents in windows:
func ProcessLargeDataset(ctx context.Context, embedder *BatchEmbedder, filepath string) error { file, err := os.Open(filepath) if err != nil { return err } defer file.Close()
decoder := json.NewDecoder(file)
// Read opening bracket if _, err := decoder.Token(); err != nil { return err }
var batch []Document batchSize := 10000 // Process 10k docs at a time
for decoder.More() { var doc Document if err := decoder.Decode(&doc); err != nil { return err }
batch = append(batch, doc)
if len(batch) >= batchSize { if err := embedder.Process(ctx, batch); err != nil { return err } batch = nil // Release memory } }
// Process remaining if len(batch) > 0 { return embedder.Process(ctx, batch) }
return nil}func handleEmbeddingError(err error) { var apiErr *core.APIError if errors.As(err, &apiErr) { switch apiErr.StatusCode { case 400: log.Printf("Bad request - check input format: %v", apiErr) case 401: log.Fatal("Invalid API key") case 429: log.Printf("Rate limited - backing off") time.Sleep(30 * time.Second) case 500, 503: log.Printf("Server error - will retry: %v", apiErr) default: log.Printf("API error %d: %v", apiErr.StatusCode, apiErr) } return }
if errors.Is(err, context.DeadlineExceeded) { log.Printf("Request timeout - consider smaller batches") return }
if errors.Is(err, context.Canceled) { log.Printf("Request canceled") return }
log.Printf("Unknown error: %v", err)}RAG Pipeline
Use these embeddings for retrieval. RAG Pipeline →
Embeddings Guide
Deep dive into embedding concepts. Embeddings →
Vector Stores
Learn about storage options. Vector Stores →
Providers
Compare embedding providers. Providers →