Skip to content

Data Enrichment

This example builds a data enrichment pipeline that validates, normalizes, and enriches business records using external APIs and LLM-powered extraction.

┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Validate │────▶│ Normalize │────▶│ Dedupe │────▶│ Enrich │
│ Input │ │ Data │ │ Check │ │ Company │
└─────────────┘ └─────────────┘ └─────────────┘ └──────┬──────┘
┌──────────────────────────────────────────────┘
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Enrich │────▶│ Score │────▶│ Output │
│ Contact │ │ Lead │ │ Record │
└─────────────┘ └─────────────┘ └─────────────┘
  • Lead enrichment: Enhance inbound leads with company and contact data
  • CRM hygiene: Clean and normalize existing CRM records
  • Data migration: Transform and validate data during system migrations
  • List building: Enrich prospect lists with firmographic data
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/mail"
"os"
"regexp"
"strings"
"time"
"github.com/petal-labs/iris/providers/openai"
"github.com/petal-labs/petalflow"
"github.com/petal-labs/petalflow/irisadapter"
)
// Lead represents an inbound lead record
type Lead struct {
ID string `json:"id"`
Email string `json:"email"`
Name string `json:"name"`
Company string `json:"company"`
Phone string `json:"phone"`
Website string `json:"website"`
Source string `json:"source"`
Message string `json:"message"`
}
// EnrichedLead contains the enriched data
type EnrichedLead struct {
Lead
NormalizedEmail string `json:"normalized_email"`
NormalizedPhone string `json:"normalized_phone"`
NormalizedCompany string `json:"normalized_company"`
Domain string `json:"domain"`
CompanyData map[string]any `json:"company_data"`
ContactData map[string]any `json:"contact_data"`
LeadScore int `json:"lead_score"`
ScoreReasons []string `json:"score_reasons"`
IsDuplicate bool `json:"is_duplicate"`
DuplicateOf string `json:"duplicate_of,omitempty"`
ProcessedAt time.Time `json:"processed_at"`
Errors []string `json:"errors,omitempty"`
}
func main() {
provider := openai.New(os.Getenv("OPENAI_API_KEY"))
client := irisadapter.NewProviderAdapter(provider)
graph := buildEnrichmentGraph(client)
runEnrichmentPipeline(graph)
}
func buildEnrichmentGraph(client *irisadapter.ProviderAdapter) petalflow.Graph {
g := petalflow.NewGraph("lead-enrichment")
// Stage 1: Validate Input
validateNode := petalflow.NewGuardianNode("validate_input", petalflow.GuardianNodeConfig{
Checks: []petalflow.GuardCheck{
{Var: "lead.email", Op: petalflow.OpNotEmpty, Message: "Email is required"},
{Var: "lead.email", Op: petalflow.OpMatches, Value: `^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$`, Message: "Invalid email format"},
},
OnFail: petalflow.GuardActionRoute,
FailRoute: "handle_invalid",
ErrorKey: "validation_errors",
})
// Handle invalid records
handleInvalidNode := petalflow.NewTransformNode("handle_invalid", petalflow.TransformNodeConfig{
Transform: func(inputs map[string]any) (any, error) {
lead := inputs["lead"].(Lead)
errors := inputs["validation_errors"].([]string)
return EnrichedLead{
Lead: lead,
Errors: errors,
ProcessedAt: time.Now().UTC(),
}, nil
},
InputKeys: []string{"lead", "validation_errors"},
OutputKey: "enriched_lead",
})
// Stage 2: Normalize Data
normalizeNode := petalflow.NewTransformNode("normalize_data", petalflow.TransformNodeConfig{
Transform: func(inputs map[string]any) (any, error) {
lead := inputs["lead"].(Lead)
// Normalize email
normalizedEmail := strings.ToLower(strings.TrimSpace(lead.Email))
// Extract domain from email
parts := strings.Split(normalizedEmail, "@")
domain := ""
if len(parts) == 2 {
domain = parts[1]
}
// Normalize phone (remove non-digits, format as E.164)
normalizedPhone := normalizePhone(lead.Phone)
// Normalize company name
normalizedCompany := normalizeCompanyName(lead.Company)
return map[string]any{
"normalized_email": normalizedEmail,
"normalized_phone": normalizedPhone,
"normalized_company": normalizedCompany,
"domain": domain,
}, nil
},
InputKeys: []string{"lead"},
OutputKey: "normalized_data",
})
// Stage 3: Deduplication Check
dedupeNode := petalflow.NewToolNode("dedupe_check", petalflow.ToolNodeConfig{
ToolFunc: func(ctx context.Context, env *petalflow.Envelope) error {
normalizedData := env.GetVar("normalized_data").(map[string]any)
email := normalizedData["normalized_email"].(string)
domain := normalizedData["domain"].(string)
// Check for existing records (mock implementation)
existing, duplicateID := checkDuplicate(ctx, email, domain)
env.SetVar("is_duplicate", existing)
if existing {
env.SetVar("duplicate_of", duplicateID)
}
return nil
},
Timeout: 5 * time.Second,
})
// Route based on duplicate status
dedupeRouter := petalflow.NewRuleRouter("dedupe_router", petalflow.RuleRouterConfig{
Routes: []petalflow.RouteRule{
{
When: petalflow.RouteCondition{Var: "is_duplicate", Op: petalflow.OpEquals, Value: true},
To: "handle_duplicate",
},
},
Default: "enrich_company",
})
// Handle duplicate
handleDuplicateNode := petalflow.NewTransformNode("handle_duplicate", petalflow.TransformNodeConfig{
Transform: func(inputs map[string]any) (any, error) {
lead := inputs["lead"].(Lead)
normalizedData := inputs["normalized_data"].(map[string]any)
return EnrichedLead{
Lead: lead,
NormalizedEmail: normalizedData["normalized_email"].(string),
NormalizedCompany: normalizedData["normalized_company"].(string),
Domain: normalizedData["domain"].(string),
IsDuplicate: true,
DuplicateOf: inputs["duplicate_of"].(string),
ProcessedAt: time.Now().UTC(),
}, nil
},
InputKeys: []string{"lead", "normalized_data", "duplicate_of"},
OutputKey: "enriched_lead",
})
// Stage 4: Company Enrichment
enrichCompanyNode := petalflow.NewToolNode("enrich_company", petalflow.ToolNodeConfig{
ToolFunc: func(ctx context.Context, env *petalflow.Envelope) error {
normalizedData := env.GetVar("normalized_data").(map[string]any)
domain := normalizedData["domain"].(string)
// Call company data API (e.g., Clearbit, Apollo, ZoomInfo)
companyData, err := fetchCompanyData(ctx, domain)
if err != nil {
log.Printf("Company enrichment failed for %s: %v", domain, err)
companyData = map[string]any{"enrichment_status": "failed"}
}
env.SetVar("company_data", companyData)
return nil
},
Timeout: 10 * time.Second,
})
// Stage 5: Contact Enrichment
enrichContactNode := petalflow.NewToolNode("enrich_contact", petalflow.ToolNodeConfig{
ToolFunc: func(ctx context.Context, env *petalflow.Envelope) error {
normalizedData := env.GetVar("normalized_data").(map[string]any)
email := normalizedData["normalized_email"].(string)
// Call contact data API
contactData, err := fetchContactData(ctx, email)
if err != nil {
log.Printf("Contact enrichment failed for %s: %v", email, err)
contactData = map[string]any{"enrichment_status": "failed"}
}
env.SetVar("contact_data", contactData)
return nil
},
Timeout: 10 * time.Second,
})
// Stage 6: LLM-based Lead Scoring
scoreNode := petalflow.NewLLMNode("score_lead", client, petalflow.LLMNodeConfig{
Model: "gpt-4o-mini",
SystemPrompt: `You are a lead scoring expert. Analyze the lead data and company information to score the lead from 0-100.
Consider these factors:
- Company size and revenue (larger = higher score)
- Industry fit (technology, finance, healthcare = higher)
- Job title seniority (C-level, VP, Director = higher)
- Email domain (corporate domain vs free email)
- Engagement signals (message content, source)
Return JSON:
{
"score": 0-100,
"reasons": ["reason1", "reason2", "reason3"],
"recommended_action": "hot_lead|nurture|disqualify"
}`,
PromptTemplate: `Lead Information:
- Name: {{.Vars.lead.Name}}
- Email: {{.Vars.normalized_data.normalized_email}}
- Company: {{.Vars.lead.Company}}
- Message: {{.Vars.lead.Message}}
- Source: {{.Vars.lead.Source}}
Company Data:
{{range $k, $v := .Vars.company_data}}
- {{$k}}: {{$v}}
{{end}}
Contact Data:
{{range $k, $v := .Vars.contact_data}}
- {{$k}}: {{$v}}
{{end}}
Score this lead.`,
OutputKey: "lead_score_result",
ResponseFormat: petalflow.ResponseFormatJSON,
Temperature: 0.2,
})
// Stage 7: Build Final Output
outputNode := petalflow.NewTransformNode("build_output", petalflow.TransformNodeConfig{
Transform: func(inputs map[string]any) (any, error) {
lead := inputs["lead"].(Lead)
normalizedData := inputs["normalized_data"].(map[string]any)
companyData := inputs["company_data"].(map[string]any)
contactData := inputs["contact_data"].(map[string]any)
scoreResultRaw := inputs["lead_score_result"].(string)
var scoreResult struct {
Score int `json:"score"`
Reasons []string `json:"reasons"`
RecommendedAction string `json:"recommended_action"`
}
json.Unmarshal([]byte(scoreResultRaw), &scoreResult)
return EnrichedLead{
Lead: lead,
NormalizedEmail: normalizedData["normalized_email"].(string),
NormalizedPhone: normalizedData["normalized_phone"].(string),
NormalizedCompany: normalizedData["normalized_company"].(string),
Domain: normalizedData["domain"].(string),
CompanyData: companyData,
ContactData: contactData,
LeadScore: scoreResult.Score,
ScoreReasons: scoreResult.Reasons,
IsDuplicate: false,
ProcessedAt: time.Now().UTC(),
}, nil
},
InputKeys: []string{"lead", "normalized_data", "company_data", "contact_data", "lead_score_result"},
OutputKey: "enriched_lead",
})
// Add all nodes
g.AddNode(validateNode)
g.AddNode(handleInvalidNode)
g.AddNode(normalizeNode)
g.AddNode(dedupeNode)
g.AddNode(dedupeRouter)
g.AddNode(handleDuplicateNode)
g.AddNode(enrichCompanyNode)
g.AddNode(enrichContactNode)
g.AddNode(scoreNode)
g.AddNode(outputNode)
// Define edges
g.AddEdge("validate_input", "normalize_data")
g.AddEdge("validate_input", "handle_invalid")
g.AddEdge("normalize_data", "dedupe_check")
g.AddEdge("dedupe_check", "dedupe_router")
g.AddEdge("dedupe_router", "handle_duplicate")
g.AddEdge("dedupe_router", "enrich_company")
g.AddEdge("enrich_company", "enrich_contact")
g.AddEdge("enrich_contact", "score_lead")
g.AddEdge("score_lead", "build_output")
g.SetEntry("validate_input")
return g
}
// Helper functions
func normalizePhone(phone string) string {
// Remove all non-digits
re := regexp.MustCompile(`\D`)
digits := re.ReplaceAllString(phone, "")
// Format as E.164 for US numbers
if len(digits) == 10 {
return "+1" + digits
} else if len(digits) == 11 && digits[0] == '1' {
return "+" + digits
}
return digits
}
func normalizeCompanyName(company string) string {
// Trim whitespace
company = strings.TrimSpace(company)
// Remove common suffixes
suffixes := []string{", Inc.", ", Inc", " Inc.", " Inc", ", LLC", " LLC", ", Ltd.", " Ltd", " Corporation", " Corp"}
for _, suffix := range suffixes {
company = strings.TrimSuffix(company, suffix)
}
return company
}
func checkDuplicate(ctx context.Context, email, domain string) (bool, string) {
// Mock implementation - replace with actual database check
return false, ""
}
func fetchCompanyData(ctx context.Context, domain string) (map[string]any, error) {
// Mock implementation - replace with actual API call
return map[string]any{
"name": "Example Corp",
"industry": "Technology",
"employee_count": 250,
"annual_revenue": "10M-50M",
"founded_year": 2015,
"headquarters": "San Francisco, CA",
"linkedin_url": "https://linkedin.com/company/example",
"enrichment_status": "success",
}, nil
}
func fetchContactData(ctx context.Context, email string) (map[string]any, error) {
// Mock implementation - replace with actual API call
return map[string]any{
"full_name": "Jane Smith",
"title": "VP of Engineering",
"seniority": "VP",
"department": "Engineering",
"linkedin_url": "https://linkedin.com/in/janesmith",
"enrichment_status": "success",
}, nil
}
func runEnrichmentPipeline(graph petalflow.Graph) {
runtime := petalflow.NewRuntime()
// Sample lead
lead := Lead{
ID: "lead_001",
Email: "jane.smith@example.com",
Name: "Jane Smith",
Company: "Example Corp, Inc.",
Phone: "(555) 123-4567",
Website: "https://example.com",
Source: "website_demo_request",
Message: "I'm interested in learning more about your enterprise plan for our engineering team of 50 people.",
}
env := petalflow.NewEnvelope()
env.SetVar("lead", lead)
// Event handler
handler := func(event petalflow.Event) {
if event.Kind == petalflow.EventNodeEnd {
log.Printf("%s: %v", event.NodeID, event.Duration)
}
}
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
result, err := runtime.Run(ctx, graph, env, petalflow.RunOptions{
EventHandler: handler,
})
if err != nil {
log.Fatalf("Enrichment failed: %v", err)
}
// Output
enriched := result.GetVar("enriched_lead").(EnrichedLead)
output, _ := json.MarshalIndent(enriched, "", " ")
fmt.Printf("\n=== Enriched Lead ===\n%s\n", output)
}
{
"id": "lead_001",
"email": "jane.smith@example.com",
"name": "Jane Smith",
"company": "Example Corp, Inc.",
"normalized_email": "jane.smith@example.com",
"normalized_phone": "+15551234567",
"normalized_company": "Example Corp",
"domain": "example.com",
"company_data": {
"name": "Example Corp",
"industry": "Technology",
"employee_count": 250,
"annual_revenue": "10M-50M",
"founded_year": 2015,
"headquarters": "San Francisco, CA"
},
"contact_data": {
"full_name": "Jane Smith",
"title": "VP of Engineering",
"seniority": "VP",
"department": "Engineering"
},
"lead_score": 85,
"score_reasons": [
"VP-level seniority indicates decision-making authority",
"Technology industry is strong fit",
"Company size (250) matches ideal customer profile",
"Demo request shows high intent"
],
"is_duplicate": false,
"processed_at": "2024-01-15T10:30:00Z"
}

Process multiple leads in parallel:

func processBatch(graph petalflow.Graph, leads []Lead) []EnrichedLead {
runtime := petalflow.NewRuntime()
results := make([]EnrichedLead, len(leads))
var wg sync.WaitGroup
sem := make(chan struct{}, 10) // Limit concurrency
for i, lead := range leads {
wg.Add(1)
go func(idx int, l Lead) {
defer wg.Done()
sem <- struct{}{}
defer func() { <-sem }()
env := petalflow.NewEnvelope()
env.SetVar("lead", l)
result, err := runtime.Run(context.Background(), graph, env, petalflow.RunOptions{})
if err != nil {
results[idx] = EnrichedLead{Lead: l, Errors: []string{err.Error()}}
return
}
results[idx] = result.GetVar("enriched_lead").(EnrichedLead)
}(i, lead)
}
wg.Wait()
return results
}

Add support for multiple data providers:

// Parallel enrichment from multiple sources
enrichParallel := petalflow.NewToolNode("enrich_parallel", petalflow.ToolNodeConfig{
ToolFunc: func(ctx context.Context, env *petalflow.Envelope) error {
domain := env.GetVar("normalized_data").(map[string]any)["domain"].(string)
var wg sync.WaitGroup
var clearbitData, apolloData map[string]any
wg.Add(2)
go func() {
defer wg.Done()
clearbitData, _ = clearbitClient.Enrich(ctx, domain)
}()
go func() {
defer wg.Done()
apolloData, _ = apolloClient.Enrich(ctx, domain)
}()
wg.Wait()
// Merge data, preferring Clearbit for company data
merged := mergeEnrichmentData(clearbitData, apolloData)
env.SetVar("company_data", merged)
return nil
},
})

Cache enrichment results to reduce API costs:

// Add cache before company enrichment
companyCache := petalflow.NewCacheNode("company_cache", petalflow.CacheNodeConfig{
Store: redisStore,
CacheKey: "enrich:company:{{.Vars.normalized_data.domain}}",
TTL: 7 * 24 * time.Hour, // Cache for 1 week
})
g.AddEdge("dedupe_router", "company_cache")
g.AddEdge("company_cache", "enrich_company") // On cache miss