Data Enrichment
Data Enrichment
Section titled “Data Enrichment”This example builds a data enrichment pipeline that validates, normalizes, and enriches business records using external APIs and LLM-powered extraction.
What You’ll Build
Section titled “What You’ll Build”┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐│ Validate │────▶│ Normalize │────▶│ Dedupe │────▶│ Enrich ││ Input │ │ Data │ │ Check │ │ Company │└─────────────┘ └─────────────┘ └─────────────┘ └──────┬──────┘ │ ┌──────────────────────────────────────────────┘ ▼ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Enrich │────▶│ Score │────▶│ Output │ │ Contact │ │ Lead │ │ Record │ └─────────────┘ └─────────────┘ └─────────────┘Use Cases
Section titled “Use Cases”- Lead enrichment: Enhance inbound leads with company and contact data
- CRM hygiene: Clean and normalize existing CRM records
- Data migration: Transform and validate data during system migrations
- List building: Enrich prospect lists with firmographic data
Complete Implementation
Section titled “Complete Implementation”Setup and Imports
Section titled “Setup and Imports”package main
import ( "context" "encoding/json" "fmt" "log" "net/mail" "os" "regexp" "strings" "time"
"github.com/petal-labs/iris/providers/openai" "github.com/petal-labs/petalflow" "github.com/petal-labs/petalflow/irisadapter")
// Lead represents an inbound lead recordtype Lead struct { ID string `json:"id"` Email string `json:"email"` Name string `json:"name"` Company string `json:"company"` Phone string `json:"phone"` Website string `json:"website"` Source string `json:"source"` Message string `json:"message"`}
// EnrichedLead contains the enriched datatype EnrichedLead struct { Lead NormalizedEmail string `json:"normalized_email"` NormalizedPhone string `json:"normalized_phone"` NormalizedCompany string `json:"normalized_company"` Domain string `json:"domain"` CompanyData map[string]any `json:"company_data"` ContactData map[string]any `json:"contact_data"` LeadScore int `json:"lead_score"` ScoreReasons []string `json:"score_reasons"` IsDuplicate bool `json:"is_duplicate"` DuplicateOf string `json:"duplicate_of,omitempty"` ProcessedAt time.Time `json:"processed_at"` Errors []string `json:"errors,omitempty"`}
func main() { provider := openai.New(os.Getenv("OPENAI_API_KEY")) client := irisadapter.NewProviderAdapter(provider)
graph := buildEnrichmentGraph(client) runEnrichmentPipeline(graph)}Building the Graph
Section titled “Building the Graph”func buildEnrichmentGraph(client *irisadapter.ProviderAdapter) petalflow.Graph { g := petalflow.NewGraph("lead-enrichment")
// Stage 1: Validate Input validateNode := petalflow.NewGuardianNode("validate_input", petalflow.GuardianNodeConfig{ Checks: []petalflow.GuardCheck{ {Var: "lead.email", Op: petalflow.OpNotEmpty, Message: "Email is required"}, {Var: "lead.email", Op: petalflow.OpMatches, Value: `^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$`, Message: "Invalid email format"}, }, OnFail: petalflow.GuardActionRoute, FailRoute: "handle_invalid", ErrorKey: "validation_errors", })
// Handle invalid records handleInvalidNode := petalflow.NewTransformNode("handle_invalid", petalflow.TransformNodeConfig{ Transform: func(inputs map[string]any) (any, error) { lead := inputs["lead"].(Lead) errors := inputs["validation_errors"].([]string) return EnrichedLead{ Lead: lead, Errors: errors, ProcessedAt: time.Now().UTC(), }, nil }, InputKeys: []string{"lead", "validation_errors"}, OutputKey: "enriched_lead", })
// Stage 2: Normalize Data normalizeNode := petalflow.NewTransformNode("normalize_data", petalflow.TransformNodeConfig{ Transform: func(inputs map[string]any) (any, error) { lead := inputs["lead"].(Lead)
// Normalize email normalizedEmail := strings.ToLower(strings.TrimSpace(lead.Email))
// Extract domain from email parts := strings.Split(normalizedEmail, "@") domain := "" if len(parts) == 2 { domain = parts[1] }
// Normalize phone (remove non-digits, format as E.164) normalizedPhone := normalizePhone(lead.Phone)
// Normalize company name normalizedCompany := normalizeCompanyName(lead.Company)
return map[string]any{ "normalized_email": normalizedEmail, "normalized_phone": normalizedPhone, "normalized_company": normalizedCompany, "domain": domain, }, nil }, InputKeys: []string{"lead"}, OutputKey: "normalized_data", })
// Stage 3: Deduplication Check dedupeNode := petalflow.NewToolNode("dedupe_check", petalflow.ToolNodeConfig{ ToolFunc: func(ctx context.Context, env *petalflow.Envelope) error { normalizedData := env.GetVar("normalized_data").(map[string]any) email := normalizedData["normalized_email"].(string) domain := normalizedData["domain"].(string)
// Check for existing records (mock implementation) existing, duplicateID := checkDuplicate(ctx, email, domain)
env.SetVar("is_duplicate", existing) if existing { env.SetVar("duplicate_of", duplicateID) } return nil }, Timeout: 5 * time.Second, })
// Route based on duplicate status dedupeRouter := petalflow.NewRuleRouter("dedupe_router", petalflow.RuleRouterConfig{ Routes: []petalflow.RouteRule{ { When: petalflow.RouteCondition{Var: "is_duplicate", Op: petalflow.OpEquals, Value: true}, To: "handle_duplicate", }, }, Default: "enrich_company", })
// Handle duplicate handleDuplicateNode := petalflow.NewTransformNode("handle_duplicate", petalflow.TransformNodeConfig{ Transform: func(inputs map[string]any) (any, error) { lead := inputs["lead"].(Lead) normalizedData := inputs["normalized_data"].(map[string]any) return EnrichedLead{ Lead: lead, NormalizedEmail: normalizedData["normalized_email"].(string), NormalizedCompany: normalizedData["normalized_company"].(string), Domain: normalizedData["domain"].(string), IsDuplicate: true, DuplicateOf: inputs["duplicate_of"].(string), ProcessedAt: time.Now().UTC(), }, nil }, InputKeys: []string{"lead", "normalized_data", "duplicate_of"}, OutputKey: "enriched_lead", })
// Stage 4: Company Enrichment enrichCompanyNode := petalflow.NewToolNode("enrich_company", petalflow.ToolNodeConfig{ ToolFunc: func(ctx context.Context, env *petalflow.Envelope) error { normalizedData := env.GetVar("normalized_data").(map[string]any) domain := normalizedData["domain"].(string)
// Call company data API (e.g., Clearbit, Apollo, ZoomInfo) companyData, err := fetchCompanyData(ctx, domain) if err != nil { log.Printf("Company enrichment failed for %s: %v", domain, err) companyData = map[string]any{"enrichment_status": "failed"} }
env.SetVar("company_data", companyData) return nil }, Timeout: 10 * time.Second, })
// Stage 5: Contact Enrichment enrichContactNode := petalflow.NewToolNode("enrich_contact", petalflow.ToolNodeConfig{ ToolFunc: func(ctx context.Context, env *petalflow.Envelope) error { normalizedData := env.GetVar("normalized_data").(map[string]any) email := normalizedData["normalized_email"].(string)
// Call contact data API contactData, err := fetchContactData(ctx, email) if err != nil { log.Printf("Contact enrichment failed for %s: %v", email, err) contactData = map[string]any{"enrichment_status": "failed"} }
env.SetVar("contact_data", contactData) return nil }, Timeout: 10 * time.Second, })
// Stage 6: LLM-based Lead Scoring scoreNode := petalflow.NewLLMNode("score_lead", client, petalflow.LLMNodeConfig{ Model: "gpt-4o-mini", SystemPrompt: `You are a lead scoring expert. Analyze the lead data and company information to score the lead from 0-100.
Consider these factors:- Company size and revenue (larger = higher score)- Industry fit (technology, finance, healthcare = higher)- Job title seniority (C-level, VP, Director = higher)- Email domain (corporate domain vs free email)- Engagement signals (message content, source)
Return JSON:{ "score": 0-100, "reasons": ["reason1", "reason2", "reason3"], "recommended_action": "hot_lead|nurture|disqualify"}`, PromptTemplate: `Lead Information:- Name: {{.Vars.lead.Name}}- Email: {{.Vars.normalized_data.normalized_email}}- Company: {{.Vars.lead.Company}}- Message: {{.Vars.lead.Message}}- Source: {{.Vars.lead.Source}}
Company Data:{{range $k, $v := .Vars.company_data}}- {{$k}}: {{$v}}{{end}}
Contact Data:{{range $k, $v := .Vars.contact_data}}- {{$k}}: {{$v}}{{end}}
Score this lead.`, OutputKey: "lead_score_result", ResponseFormat: petalflow.ResponseFormatJSON, Temperature: 0.2, })
// Stage 7: Build Final Output outputNode := petalflow.NewTransformNode("build_output", petalflow.TransformNodeConfig{ Transform: func(inputs map[string]any) (any, error) { lead := inputs["lead"].(Lead) normalizedData := inputs["normalized_data"].(map[string]any) companyData := inputs["company_data"].(map[string]any) contactData := inputs["contact_data"].(map[string]any) scoreResultRaw := inputs["lead_score_result"].(string)
var scoreResult struct { Score int `json:"score"` Reasons []string `json:"reasons"` RecommendedAction string `json:"recommended_action"` } json.Unmarshal([]byte(scoreResultRaw), &scoreResult)
return EnrichedLead{ Lead: lead, NormalizedEmail: normalizedData["normalized_email"].(string), NormalizedPhone: normalizedData["normalized_phone"].(string), NormalizedCompany: normalizedData["normalized_company"].(string), Domain: normalizedData["domain"].(string), CompanyData: companyData, ContactData: contactData, LeadScore: scoreResult.Score, ScoreReasons: scoreResult.Reasons, IsDuplicate: false, ProcessedAt: time.Now().UTC(), }, nil }, InputKeys: []string{"lead", "normalized_data", "company_data", "contact_data", "lead_score_result"}, OutputKey: "enriched_lead", })
// Add all nodes g.AddNode(validateNode) g.AddNode(handleInvalidNode) g.AddNode(normalizeNode) g.AddNode(dedupeNode) g.AddNode(dedupeRouter) g.AddNode(handleDuplicateNode) g.AddNode(enrichCompanyNode) g.AddNode(enrichContactNode) g.AddNode(scoreNode) g.AddNode(outputNode)
// Define edges g.AddEdge("validate_input", "normalize_data") g.AddEdge("validate_input", "handle_invalid") g.AddEdge("normalize_data", "dedupe_check") g.AddEdge("dedupe_check", "dedupe_router") g.AddEdge("dedupe_router", "handle_duplicate") g.AddEdge("dedupe_router", "enrich_company") g.AddEdge("enrich_company", "enrich_contact") g.AddEdge("enrich_contact", "score_lead") g.AddEdge("score_lead", "build_output")
g.SetEntry("validate_input")
return g}
// Helper functionsfunc normalizePhone(phone string) string { // Remove all non-digits re := regexp.MustCompile(`\D`) digits := re.ReplaceAllString(phone, "")
// Format as E.164 for US numbers if len(digits) == 10 { return "+1" + digits } else if len(digits) == 11 && digits[0] == '1' { return "+" + digits } return digits}
func normalizeCompanyName(company string) string { // Trim whitespace company = strings.TrimSpace(company)
// Remove common suffixes suffixes := []string{", Inc.", ", Inc", " Inc.", " Inc", ", LLC", " LLC", ", Ltd.", " Ltd", " Corporation", " Corp"} for _, suffix := range suffixes { company = strings.TrimSuffix(company, suffix) }
return company}
func checkDuplicate(ctx context.Context, email, domain string) (bool, string) { // Mock implementation - replace with actual database check return false, ""}
func fetchCompanyData(ctx context.Context, domain string) (map[string]any, error) { // Mock implementation - replace with actual API call return map[string]any{ "name": "Example Corp", "industry": "Technology", "employee_count": 250, "annual_revenue": "10M-50M", "founded_year": 2015, "headquarters": "San Francisco, CA", "linkedin_url": "https://linkedin.com/company/example", "enrichment_status": "success", }, nil}
func fetchContactData(ctx context.Context, email string) (map[string]any, error) { // Mock implementation - replace with actual API call return map[string]any{ "full_name": "Jane Smith", "title": "VP of Engineering", "seniority": "VP", "department": "Engineering", "linkedin_url": "https://linkedin.com/in/janesmith", "enrichment_status": "success", }, nil}Running the Pipeline
Section titled “Running the Pipeline”func runEnrichmentPipeline(graph petalflow.Graph) { runtime := petalflow.NewRuntime()
// Sample lead lead := Lead{ ID: "lead_001", Email: "jane.smith@example.com", Name: "Jane Smith", Company: "Example Corp, Inc.", Phone: "(555) 123-4567", Website: "https://example.com", Source: "website_demo_request", Message: "I'm interested in learning more about your enterprise plan for our engineering team of 50 people.", }
env := petalflow.NewEnvelope() env.SetVar("lead", lead)
// Event handler handler := func(event petalflow.Event) { if event.Kind == petalflow.EventNodeEnd { log.Printf("✓ %s: %v", event.NodeID, event.Duration) } }
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel()
result, err := runtime.Run(ctx, graph, env, petalflow.RunOptions{ EventHandler: handler, }) if err != nil { log.Fatalf("Enrichment failed: %v", err) }
// Output enriched := result.GetVar("enriched_lead").(EnrichedLead) output, _ := json.MarshalIndent(enriched, "", " ") fmt.Printf("\n=== Enriched Lead ===\n%s\n", output)}Example Output
Section titled “Example Output”{ "id": "lead_001", "email": "jane.smith@example.com", "name": "Jane Smith", "company": "Example Corp, Inc.", "normalized_email": "jane.smith@example.com", "normalized_phone": "+15551234567", "normalized_company": "Example Corp", "domain": "example.com", "company_data": { "name": "Example Corp", "industry": "Technology", "employee_count": 250, "annual_revenue": "10M-50M", "founded_year": 2015, "headquarters": "San Francisco, CA" }, "contact_data": { "full_name": "Jane Smith", "title": "VP of Engineering", "seniority": "VP", "department": "Engineering" }, "lead_score": 85, "score_reasons": [ "VP-level seniority indicates decision-making authority", "Technology industry is strong fit", "Company size (250) matches ideal customer profile", "Demo request shows high intent" ], "is_duplicate": false, "processed_at": "2024-01-15T10:30:00Z"}Variations
Section titled “Variations”Batch Processing
Section titled “Batch Processing”Process multiple leads in parallel:
func processBatch(graph petalflow.Graph, leads []Lead) []EnrichedLead { runtime := petalflow.NewRuntime() results := make([]EnrichedLead, len(leads)) var wg sync.WaitGroup sem := make(chan struct{}, 10) // Limit concurrency
for i, lead := range leads { wg.Add(1) go func(idx int, l Lead) { defer wg.Done() sem <- struct{}{} defer func() { <-sem }()
env := petalflow.NewEnvelope() env.SetVar("lead", l)
result, err := runtime.Run(context.Background(), graph, env, petalflow.RunOptions{}) if err != nil { results[idx] = EnrichedLead{Lead: l, Errors: []string{err.Error()}} return } results[idx] = result.GetVar("enriched_lead").(EnrichedLead) }(i, lead) }
wg.Wait() return results}Custom Enrichment Providers
Section titled “Custom Enrichment Providers”Add support for multiple data providers:
// Parallel enrichment from multiple sourcesenrichParallel := petalflow.NewToolNode("enrich_parallel", petalflow.ToolNodeConfig{ ToolFunc: func(ctx context.Context, env *petalflow.Envelope) error { domain := env.GetVar("normalized_data").(map[string]any)["domain"].(string)
var wg sync.WaitGroup var clearbitData, apolloData map[string]any
wg.Add(2) go func() { defer wg.Done() clearbitData, _ = clearbitClient.Enrich(ctx, domain) }() go func() { defer wg.Done() apolloData, _ = apolloClient.Enrich(ctx, domain) }() wg.Wait()
// Merge data, preferring Clearbit for company data merged := mergeEnrichmentData(clearbitData, apolloData) env.SetVar("company_data", merged) return nil },})Enrichment with Caching
Section titled “Enrichment with Caching”Cache enrichment results to reduce API costs:
// Add cache before company enrichmentcompanyCache := petalflow.NewCacheNode("company_cache", petalflow.CacheNodeConfig{ Store: redisStore, CacheKey: "enrich:company:{{.Vars.normalized_data.domain}}", TTL: 7 * 24 * time.Hour, // Cache for 1 week})
g.AddEdge("dedupe_router", "company_cache")g.AddEdge("company_cache", "enrich_company") // On cache miss