Skip to content

Batch Embeddings

This example demonstrates production patterns for generating embeddings at scale. You’ll learn how to process large document collections with batching, rate limiting, progress tracking, checkpointing, and incremental persistence to vector stores.

A batch embedding pipeline that:

  • Processes documents in configurable batches with automatic sizing
  • Respects rate limits with adaptive throttling
  • Tracks progress with callbacks and persistence
  • Handles failures gracefully with retry logic and checkpointing
  • Stores embeddings incrementally to avoid losing work
  • Supports multiple vector stores (Qdrant, PostgreSQL, Pinecone)
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/petal-labs/iris/core"
"github.com/petal-labs/iris/providers/voyageai"
)
// Document represents a source document to be embedded.
type Document struct {
ID string `json:"id"`
Content string `json:"content"`
Metadata map[string]string `json:"metadata,omitempty"`
}
// EmbeddedDocument contains a document with its embedding vector.
type EmbeddedDocument struct {
Document
Embedding []float32 `json:"embedding"`
Model string `json:"model"`
CreatedAt time.Time `json:"created_at"`
}
// BatchConfig controls the embedding pipeline behavior.
type BatchConfig struct {
BatchSize int // Documents per batch (default: 128)
MaxConcurrency int // Parallel batch workers (default: 3)
RequestsPerMin int // Rate limit (default: 300 for Voyage AI)
RetryAttempts int // Retries per batch (default: 3)
RetryDelay time.Duration // Initial retry delay (default: 1s)
CheckpointEvery int // Save progress every N documents (default: 1000)
}
// DefaultBatchConfig returns sensible defaults for production use.
func DefaultBatchConfig() BatchConfig {
return BatchConfig{
BatchSize: 128,
MaxConcurrency: 3,
RequestsPerMin: 300,
RetryAttempts: 3,
RetryDelay: time.Second,
CheckpointEvery: 1000,
}
}
// ProgressCallback receives updates during batch processing.
type ProgressCallback func(processed, total int, lastBatchDuration time.Duration)
// VectorStore defines the interface for persisting embeddings.
type VectorStore interface {
// Upsert stores or updates embedded documents.
Upsert(ctx context.Context, docs []EmbeddedDocument) error
// Close releases any resources.
Close() error
}
// BatchEmbedder processes documents in batches with rate limiting.
type BatchEmbedder struct {
provider core.EmbeddingProvider
model string
config BatchConfig
store VectorStore
onProgress ProgressCallback
// Rate limiting
rateLimiter *time.Ticker
rateLimit chan struct{}
// Checkpointing
checkpointFile string
checkpoint *Checkpoint
mu sync.Mutex
}
// Checkpoint tracks progress for resumable processing.
type Checkpoint struct {
ProcessedIDs map[string]bool `json:"processed_ids"`
LastUpdated time.Time `json:"last_updated"`
TotalCount int `json:"total_count"`
}
// NewBatchEmbedder creates a new batch processor.
func NewBatchEmbedder(provider core.EmbeddingProvider, model string, config BatchConfig) *BatchEmbedder {
// Calculate rate limit interval
interval := time.Minute / time.Duration(config.RequestsPerMin)
be := &BatchEmbedder{
provider: provider,
model: model,
config: config,
rateLimiter: time.NewTicker(interval),
rateLimit: make(chan struct{}, config.MaxConcurrency),
checkpoint: &Checkpoint{ProcessedIDs: make(map[string]bool)},
}
return be
}
// WithStore sets the vector store for persistence.
func (be *BatchEmbedder) WithStore(store VectorStore) *BatchEmbedder {
be.store = store
return be
}
// WithProgress sets a progress callback.
func (be *BatchEmbedder) WithProgress(cb ProgressCallback) *BatchEmbedder {
be.onProgress = cb
return be
}
// WithCheckpoint enables resumable processing from a file.
func (be *BatchEmbedder) WithCheckpoint(filepath string) *BatchEmbedder {
be.checkpointFile = filepath
be.loadCheckpoint()
return be
}
func (be *BatchEmbedder) loadCheckpoint() {
data, err := os.ReadFile(be.checkpointFile)
if err != nil {
return // No checkpoint exists
}
json.Unmarshal(data, be.checkpoint)
}
func (be *BatchEmbedder) saveCheckpoint() error {
be.mu.Lock()
defer be.mu.Unlock()
be.checkpoint.LastUpdated = time.Now()
data, err := json.MarshalIndent(be.checkpoint, "", " ")
if err != nil {
return err
}
return os.WriteFile(be.checkpointFile, data, 0644)
}
// Process embeds all documents and stores them.
func (be *BatchEmbedder) Process(ctx context.Context, docs []Document) error {
// Filter already-processed documents
var pending []Document
for _, doc := range docs {
if !be.checkpoint.ProcessedIDs[doc.ID] {
pending = append(pending, doc)
}
}
if len(pending) == 0 {
log.Println("All documents already processed")
return nil
}
be.checkpoint.TotalCount = len(docs)
total := len(pending)
processed := len(docs) - total
log.Printf("Processing %d documents (%d already complete)", total, processed)
// Create batches
var batches [][]Document
for i := 0; i < len(pending); i += be.config.BatchSize {
end := i + be.config.BatchSize
if end > len(pending) {
end = len(pending)
}
batches = append(batches, pending[i:end])
}
// Process with concurrency control
results := make(chan []EmbeddedDocument, len(batches))
errors := make(chan error, len(batches))
var wg sync.WaitGroup
semaphore := make(chan struct{}, be.config.MaxConcurrency)
for batchNum, batch := range batches {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
wg.Add(1)
go func(num int, docs []Document) {
defer wg.Done()
semaphore <- struct{}{} // Acquire
defer func() { <-semaphore }() // Release
// Wait for rate limit
<-be.rateLimiter.C
start := time.Now()
embedded, err := be.processBatch(ctx, docs)
duration := time.Since(start)
if err != nil {
errors <- fmt.Errorf("batch %d: %w", num, err)
return
}
results <- embedded
// Update progress
be.mu.Lock()
for _, doc := range embedded {
be.checkpoint.ProcessedIDs[doc.ID] = true
}
processed += len(embedded)
be.mu.Unlock()
if be.onProgress != nil {
be.onProgress(processed, len(docs), duration)
}
// Periodic checkpoint
if processed%be.config.CheckpointEvery == 0 && be.checkpointFile != "" {
be.saveCheckpoint()
}
}(batchNum, batch)
}
// Collect results in background
go func() {
wg.Wait()
close(results)
close(errors)
}()
// Store results as they arrive
var allEmbedded []EmbeddedDocument
for embedded := range results {
allEmbedded = append(allEmbedded, embedded...)
// Incremental persistence
if be.store != nil && len(allEmbedded) >= be.config.BatchSize*2 {
if err := be.store.Upsert(ctx, allEmbedded); err != nil {
log.Printf("Warning: store upsert failed: %v", err)
} else {
allEmbedded = nil // Clear after successful store
}
}
}
// Store remaining
if be.store != nil && len(allEmbedded) > 0 {
if err := be.store.Upsert(ctx, allEmbedded); err != nil {
return fmt.Errorf("final store upsert: %w", err)
}
}
// Check for errors
var errs []error
for err := range errors {
errs = append(errs, err)
}
if len(errs) > 0 {
return fmt.Errorf("batch processing errors: %v", errs)
}
// Final checkpoint
if be.checkpointFile != "" {
be.saveCheckpoint()
}
return nil
}
func (be *BatchEmbedder) processBatch(ctx context.Context, docs []Document) ([]EmbeddedDocument, error) {
var lastErr error
for attempt := 0; attempt < be.config.RetryAttempts; attempt++ {
if attempt > 0 {
delay := be.config.RetryDelay * time.Duration(1<<attempt)
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(delay):
}
}
embedded, err := be.embedBatch(ctx, docs)
if err == nil {
return embedded, nil
}
lastErr = err
// Check if error is retryable
if apiErr, ok := err.(*core.APIError); ok {
if apiErr.StatusCode == 429 {
// Rate limited - wait longer
time.Sleep(time.Duration(attempt+1) * 5 * time.Second)
continue
}
if apiErr.StatusCode >= 500 {
continue // Server error, retry
}
return nil, err // Client error, don't retry
}
}
return nil, fmt.Errorf("after %d attempts: %w", be.config.RetryAttempts, lastErr)
}
func (be *BatchEmbedder) embedBatch(ctx context.Context, docs []Document) ([]EmbeddedDocument, error) {
// Build embedding inputs
inputs := make([]core.EmbeddingInput, len(docs))
for i, doc := range docs {
inputs[i] = core.EmbeddingInput{
Text: doc.Content,
}
}
req := &core.EmbeddingRequest{
Model: be.model,
Input: inputs,
InputType: core.InputTypeDocument,
}
resp, err := be.provider.Embeddings(ctx, req)
if err != nil {
return nil, err
}
if len(resp.Embeddings) != len(docs) {
return nil, fmt.Errorf("embedding count mismatch: got %d, expected %d",
len(resp.Embeddings), len(docs))
}
// Build embedded documents
embedded := make([]EmbeddedDocument, len(docs))
for i, doc := range docs {
embedded[i] = EmbeddedDocument{
Document: doc,
Embedding: resp.Embeddings[i].Values,
Model: be.model,
CreatedAt: time.Now(),
}
}
return embedded, nil
}
// Close releases resources.
func (be *BatchEmbedder) Close() {
be.rateLimiter.Stop()
if be.store != nil {
be.store.Close()
}
}
func main() {
// Initialize provider from keystore
provider, err := voyageai.NewFromKeystore()
if err != nil {
provider, err = voyageai.NewFromEnv()
if err != nil {
log.Fatal("No Voyage AI key. Run: iris keys set voyageai")
}
}
// Load documents (example: from JSON file)
docs, err := loadDocuments("documents.json")
if err != nil {
log.Fatalf("Failed to load documents: %v", err)
}
// Configure batch processing
config := DefaultBatchConfig()
config.BatchSize = 128 // Voyage AI supports up to 128
config.MaxConcurrency = 3 // Parallel batches
config.RequestsPerMin = 300 // Stay under rate limit
// Create embedder with Qdrant storage
store, err := NewQdrantStore("http://localhost:6333", "documents")
if err != nil {
log.Fatalf("Failed to connect to Qdrant: %v", err)
}
embedder := NewBatchEmbedder(provider, "voyage-3-large", config).
WithStore(store).
WithCheckpoint("embedding_progress.json").
WithProgress(func(processed, total int, duration time.Duration) {
pct := float64(processed) / float64(total) * 100
log.Printf("Progress: %d/%d (%.1f%%) - last batch: %v",
processed, total, pct, duration)
})
defer embedder.Close()
// Handle graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
log.Println("Shutting down gracefully...")
cancel()
}()
// Process all documents
start := time.Now()
if err := embedder.Process(ctx, docs); err != nil {
if ctx.Err() != nil {
log.Println("Processing interrupted - progress saved to checkpoint")
} else {
log.Fatalf("Processing failed: %v", err)
}
}
log.Printf("Completed in %v", time.Since(start))
}
func loadDocuments(filepath string) ([]Document, error) {
data, err := os.ReadFile(filepath)
if err != nil {
return nil, err
}
var docs []Document
if err := json.Unmarshal(data, &docs); err != nil {
return nil, err
}
return docs, nil
}
import (
"context"
"fmt"
"github.com/qdrant/go-client/qdrant"
)
type QdrantStore struct {
client *qdrant.Client
collection string
}
func NewQdrantStore(url, collection string) (*QdrantStore, error) {
client, err := qdrant.NewClient(&qdrant.Config{
Host: url,
})
if err != nil {
return nil, fmt.Errorf("connect to Qdrant: %w", err)
}
return &QdrantStore{
client: client,
collection: collection,
}, nil
}
func (s *QdrantStore) Upsert(ctx context.Context, docs []EmbeddedDocument) error {
points := make([]*qdrant.PointStruct, len(docs))
for i, doc := range docs {
// Convert []float32 to []float64 for Qdrant
vector := make([]float64, len(doc.Embedding))
for j, v := range doc.Embedding {
vector[j] = float64(v)
}
payload := map[string]interface{}{
"content": doc.Content,
"metadata": doc.Metadata,
"model": doc.Model,
"created_at": doc.CreatedAt.Format(time.RFC3339),
}
points[i] = &qdrant.PointStruct{
Id: qdrant.NewIDUUID(doc.ID),
Vectors: qdrant.NewVectorsDense(vector),
Payload: qdrant.NewValueMap(payload),
}
}
_, err := s.client.Upsert(ctx, &qdrant.UpsertPoints{
CollectionName: s.collection,
Points: points,
})
return err
}
func (s *QdrantStore) Close() error {
return s.client.Close()
}
import (
"context"
"database/sql"
"fmt"
"github.com/lib/pq"
_ "github.com/lib/pq"
)
type PgVectorStore struct {
db *sql.DB
tableName string
}
func NewPgVectorStore(connStr, tableName string) (*PgVectorStore, error) {
db, err := sql.Open("postgres", connStr)
if err != nil {
return nil, fmt.Errorf("connect to PostgreSQL: %w", err)
}
// Ensure pgvector extension exists
_, err = db.Exec("CREATE EXTENSION IF NOT EXISTS vector")
if err != nil {
return nil, fmt.Errorf("create vector extension: %w", err)
}
return &PgVectorStore{
db: db,
tableName: tableName,
}, nil
}
func (s *PgVectorStore) Upsert(ctx context.Context, docs []EmbeddedDocument) error {
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
stmt, err := tx.PrepareContext(ctx, fmt.Sprintf(`
INSERT INTO %s (id, content, embedding, metadata, model, created_at)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (id) DO UPDATE SET
content = EXCLUDED.content,
embedding = EXCLUDED.embedding,
metadata = EXCLUDED.metadata,
model = EXCLUDED.model,
created_at = EXCLUDED.created_at
`, s.tableName))
if err != nil {
return err
}
defer stmt.Close()
for _, doc := range docs {
// Convert embedding to PostgreSQL vector format
vectorStr := fmt.Sprintf("[%s]", floatsToString(doc.Embedding))
_, err := stmt.ExecContext(ctx,
doc.ID,
doc.Content,
vectorStr,
doc.Metadata,
doc.Model,
doc.CreatedAt,
)
if err != nil {
return fmt.Errorf("insert document %s: %w", doc.ID, err)
}
}
return tx.Commit()
}
func floatsToString(vals []float32) string {
result := ""
for i, v := range vals {
if i > 0 {
result += ","
}
result += fmt.Sprintf("%f", v)
}
return result
}
func (s *PgVectorStore) Close() error {
return s.db.Close()
}

Voyage AI provides state-of-the-art embeddings optimized for retrieval.

import "github.com/petal-labs/iris/providers/voyageai"
provider, err := voyageai.NewFromKeystore()
// Use voyage-3-large for best quality
embedder := NewBatchEmbedder(provider, "voyage-3-large", config)
// Or voyage-3-lite for faster processing
embedder := NewBatchEmbedder(provider, "voyage-3-lite", config)

Models:

ModelDimensionsBest For
voyage-3-large1024Maximum retrieval quality
voyage-31024Balanced performance
voyage-3-lite512Cost-efficient processing
voyage-code-31024Code and technical content

For improved retrieval, add document context:

func (be *BatchEmbedder) embedBatchWithContext(ctx context.Context, docs []Document) ([]EmbeddedDocument, error) {
inputs := make([]core.EmbeddingInput, len(docs))
for i, doc := range docs {
inputs[i] = core.EmbeddingInput{
Text: doc.Content,
Context: doc.Metadata["title"], // Add title as context
}
}
req := &core.EmbeddingRequest{
Model: be.model,
Input: inputs,
InputType: core.InputTypeDocument,
}
resp, err := be.provider.Embeddings(ctx, req)
if err != nil {
return nil, err
}
// ... rest of processing
}

For long documents, chunk before embedding:

type Chunker struct {
ChunkSize int
ChunkOverlap int
}
func (c *Chunker) Chunk(doc Document) []Document {
content := doc.Content
var chunks []Document
for i := 0; i < len(content); i += c.ChunkSize - c.ChunkOverlap {
end := i + c.ChunkSize
if end > len(content) {
end = len(content)
}
chunk := Document{
ID: fmt.Sprintf("%s_chunk_%d", doc.ID, len(chunks)),
Content: content[i:end],
Metadata: map[string]string{
"parent_id": doc.ID,
"chunk_index": fmt.Sprintf("%d", len(chunks)),
},
}
// Copy parent metadata
for k, v := range doc.Metadata {
if _, exists := chunk.Metadata[k]; !exists {
chunk.Metadata[k] = v
}
}
chunks = append(chunks, chunk)
if end >= len(content) {
break
}
}
return chunks
}
// Usage
chunker := &Chunker{ChunkSize: 512, ChunkOverlap: 50}
var allChunks []Document
for _, doc := range docs {
chunks := chunker.Chunk(doc)
allChunks = append(allChunks, chunks...)
}
embedder.Process(ctx, allChunks)
type ProgressTracker struct {
startTime time.Time
batchTimes []time.Duration
processed int
total int
}
func (t *ProgressTracker) Update(processed, total int, batchDuration time.Duration) {
t.processed = processed
t.total = total
t.batchTimes = append(t.batchTimes, batchDuration)
}
func (t *ProgressTracker) ETA() time.Duration {
if len(t.batchTimes) == 0 || t.processed == 0 {
return 0
}
// Calculate average batch time
var total time.Duration
for _, d := range t.batchTimes {
total += d
}
avgBatch := total / time.Duration(len(t.batchTimes))
remaining := t.total - t.processed
batchesRemaining := remaining / 128 // Assuming batch size of 128
if remaining%128 > 0 {
batchesRemaining++
}
return avgBatch * time.Duration(batchesRemaining)
}
func (t *ProgressTracker) String() string {
pct := float64(t.processed) / float64(t.total) * 100
eta := t.ETA()
return fmt.Sprintf("Progress: %d/%d (%.1f%%) - ETA: %v",
t.processed, t.total, pct, eta.Round(time.Second))
}

Avoid re-embedding identical content:

import "crypto/sha256"
type DeduplicatingStore struct {
inner VectorStore
contentMap map[string]string // content hash -> document ID
}
func (s *DeduplicatingStore) Upsert(ctx context.Context, docs []EmbeddedDocument) error {
var unique []EmbeddedDocument
for _, doc := range docs {
hash := sha256.Sum256([]byte(doc.Content))
hashStr := fmt.Sprintf("%x", hash)
if existingID, exists := s.contentMap[hashStr]; exists {
log.Printf("Skipping duplicate: %s (same as %s)", doc.ID, existingID)
continue
}
s.contentMap[hashStr] = doc.ID
unique = append(unique, doc)
}
if len(unique) == 0 {
return nil
}
return s.inner.Upsert(ctx, unique)
}

For very large batches, process documents in windows:

func ProcessLargeDataset(ctx context.Context, embedder *BatchEmbedder, filepath string) error {
file, err := os.Open(filepath)
if err != nil {
return err
}
defer file.Close()
decoder := json.NewDecoder(file)
// Read opening bracket
if _, err := decoder.Token(); err != nil {
return err
}
var batch []Document
batchSize := 10000 // Process 10k docs at a time
for decoder.More() {
var doc Document
if err := decoder.Decode(&doc); err != nil {
return err
}
batch = append(batch, doc)
if len(batch) >= batchSize {
if err := embedder.Process(ctx, batch); err != nil {
return err
}
batch = nil // Release memory
}
}
// Process remaining
if len(batch) > 0 {
return embedder.Process(ctx, batch)
}
return nil
}
func handleEmbeddingError(err error) {
var apiErr *core.APIError
if errors.As(err, &apiErr) {
switch apiErr.StatusCode {
case 400:
log.Printf("Bad request - check input format: %v", apiErr)
case 401:
log.Fatal("Invalid API key")
case 429:
log.Printf("Rate limited - backing off")
time.Sleep(30 * time.Second)
case 500, 503:
log.Printf("Server error - will retry: %v", apiErr)
default:
log.Printf("API error %d: %v", apiErr.StatusCode, apiErr)
}
return
}
if errors.Is(err, context.DeadlineExceeded) {
log.Printf("Request timeout - consider smaller batches")
return
}
if errors.Is(err, context.Canceled) {
log.Printf("Request canceled")
return
}
log.Printf("Unknown error: %v", err)
}