Human Review Loop
Human Review Loop
Section titled “Human Review Loop”This example builds a content moderation system with human-in-the-loop review for flagged content, demonstrating approval workflows, escalation, and timeout handling.
What You’ll Build
Section titled “What You’ll Build”┌─────────────┐ ┌─────────────┐ ┌─────────────┐│ Receive │────▶│ AI │────▶│ Risk ││ Content │ │ Moderate │ │ Router │└─────────────┘ └─────────────┘ └──────┬──────┘ │ ┌──────────────────────────┼──────────────────────────┐ ▼ ▼ ▼ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Auto │ │ Human │ │ Auto │ │ Approve │ │ Review │ │ Reject │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │ │ │ ┌─────┴─────┐ │ │ ▼ ▼ │ │ ┌──────────┐ ┌──────────┐ │ │ │ Approved │ │ Rejected │ │ │ └────┬─────┘ └────┬─────┘ │ │ │ │ │ └──────────────────┴────────────┴─────────────────────┘ │ ┌──────▼──────┐ │ Finalize │ │ Decision │ └─────────────┘Use Cases
Section titled “Use Cases”- Content moderation: Review user-generated content before publishing
- Financial approvals: Require human sign-off for large transactions
- Document review: Manual review of AI-generated documents
- Compliance checks: Human verification for regulated workflows
- Quality assurance: Review AI outputs before customer delivery
Complete Implementation
Section titled “Complete Implementation”Setup and Imports
Section titled “Setup and Imports”package main
import ( "context" "encoding/json" "fmt" "log" "os" "sync" "time"
"github.com/petal-labs/iris/providers/openai" "github.com/petal-labs/petalflow" "github.com/petal-labs/petalflow/irisadapter")
// Content represents user-submitted contenttype Content struct { ID string `json:"id"` UserID string `json:"user_id"` Type string `json:"type"` // post, comment, image, etc. Text string `json:"text"` ImageURL string `json:"image_url,omitempty"` SubmittedAt time.Time `json:"submitted_at"`}
// ModerationResult contains AI moderation outputtype ModerationResult struct { Safe bool `json:"safe"` RiskScore float64 `json:"risk_score"` // 0-1 Categories []string `json:"categories"` // violence, hate, spam, etc. Confidence float64 `json:"confidence"` Reasoning string `json:"reasoning"` RequiresReview bool `json:"requires_review"`}
// ReviewDecision from human reviewertype ReviewDecision struct { Approved bool `json:"approved"` ReviewerID string `json:"reviewer_id"` Notes string `json:"notes"` ReviewedAt time.Time `json:"reviewed_at"` Action string `json:"action"` // approve, reject, escalate}
// Final moderation outcometype ModerationOutcome struct { ContentID string `json:"content_id"` Decision string `json:"decision"` // approved, rejected DecisionSource string `json:"decision_source"` // auto, human AIModeration ModerationResult `json:"ai_moderation"` HumanReview *ReviewDecision `json:"human_review,omitempty"` ProcessedAt time.Time `json:"processed_at"`}
func main() { provider := openai.New(os.Getenv("OPENAI_API_KEY")) client := irisadapter.NewProviderAdapter(provider)
// Create review queue (in production, use Redis/database) reviewQueue := NewReviewQueue()
graph := buildModerationGraph(client, reviewQueue) runModerationWorkflow(graph, reviewQueue)}Review Queue Implementation
Section titled “Review Queue Implementation”// ReviewQueue manages pending human reviewstype ReviewQueue struct { pending map[string]chan ReviewDecision mu sync.RWMutex}
func NewReviewQueue() *ReviewQueue { return &ReviewQueue{ pending: make(map[string]chan ReviewDecision), }}
func (q *ReviewQueue) SubmitForReview(contentID string, content Content, moderation ModerationResult) <-chan ReviewDecision { q.mu.Lock() defer q.mu.Unlock()
ch := make(chan ReviewDecision, 1) q.pending[contentID] = ch
// In production: send to review UI, Slack, email, etc. log.Printf("📋 Content %s submitted for human review", contentID) log.Printf(" Risk Score: %.2f", moderation.RiskScore) log.Printf(" Categories: %v", moderation.Categories) log.Printf(" Reasoning: %s", moderation.Reasoning)
return ch}
func (q *ReviewQueue) RecordDecision(contentID string, decision ReviewDecision) error { q.mu.Lock() defer q.mu.Unlock()
ch, exists := q.pending[contentID] if !exists { return fmt.Errorf("content %s not found in review queue", contentID) }
ch <- decision close(ch) delete(q.pending, contentID) return nil}
func (q *ReviewQueue) GetPendingCount() int { q.mu.RLock() defer q.mu.RUnlock() return len(q.pending)}Building the Graph
Section titled “Building the Graph”func buildModerationGraph(client *irisadapter.ProviderAdapter, reviewQueue *ReviewQueue) petalflow.Graph { g := petalflow.NewGraph("content-moderation")
// Stage 1: Validate Content validateNode := petalflow.NewGuardianNode("validate_content", petalflow.GuardianNodeConfig{ Checks: []petalflow.GuardCheck{ {Var: "content.id", Op: petalflow.OpNotEmpty, Message: "Content ID required"}, {Var: "content.user_id", Op: petalflow.OpNotEmpty, Message: "User ID required"}, {Var: "content.text", Op: petalflow.OpMaxLength, Value: 50000, Message: "Content too long"}, }, OnFail: petalflow.GuardActionReject, })
// Stage 2: AI Moderation moderateNode := petalflow.NewLLMNode("ai_moderate", client, petalflow.LLMNodeConfig{ Model: "gpt-4o", SystemPrompt: `You are a content moderation system. Analyze the content for policy violations.
Categories to check:- violence: Threats, graphic violence, self-harm- hate: Hate speech, discrimination, harassment- sexual: Explicit sexual content, nudity- spam: Advertising, scams, repetitive content- misinformation: False claims, dangerous advice- pii: Personal information exposure
Return JSON:{ "safe": true/false, "risk_score": 0.0-1.0, "categories": ["category1", "category2"], "confidence": 0.0-1.0, "reasoning": "Brief explanation", "requires_review": true/false (set true if confidence < 0.8 or risk_score between 0.3-0.7)}`, PromptTemplate: `Content to moderate:Type: {{.Vars.content.Type}}Text: {{.Vars.content.Text}}{{if .Vars.content.ImageURL}}Image URL: {{.Vars.content.ImageURL}}{{end}}
Analyze this content for policy violations.`, OutputKey: "moderation_raw", ResponseFormat: petalflow.ResponseFormatJSON, Temperature: 0.1, })
// Parse moderation result parseModeration := petalflow.NewTransformNode("parse_moderation", petalflow.TransformNodeConfig{ Transform: func(inputs map[string]any) (any, error) { raw := inputs["moderation_raw"].(string) var result ModerationResult if err := json.Unmarshal([]byte(raw), &result); err != nil { return ModerationResult{ Safe: false, RiskScore: 1.0, RequiresReview: true, Reasoning: "Failed to parse moderation result", }, nil } return result, nil }, InputKeys: []string{"moderation_raw"}, OutputKey: "moderation_result", })
// Stage 3: Risk-Based Routing riskRouter := petalflow.NewRuleRouter("risk_router", petalflow.RuleRouterConfig{ Routes: []petalflow.RouteRule{ // Auto-reject high-risk content { When: petalflow.RouteCondition{ And: []petalflow.RouteCondition{ {Var: "moderation_result.risk_score", Op: petalflow.OpGte, Value: 0.8}, {Var: "moderation_result.confidence", Op: petalflow.OpGte, Value: 0.9}, }, }, To: "auto_reject", }, // Auto-approve low-risk content { When: petalflow.RouteCondition{ And: []petalflow.RouteCondition{ {Var: "moderation_result.risk_score", Op: petalflow.OpLt, Value: 0.2}, {Var: "moderation_result.safe", Op: petalflow.OpEquals, Value: true}, {Var: "moderation_result.confidence", Op: petalflow.OpGte, Value: 0.9}, }, }, To: "auto_approve", }, // Everything else requires human review { When: petalflow.RouteCondition{ Var: "moderation_result.requires_review", Op: petalflow.OpEquals, Value: true, }, To: "human_review", }, }, Default: "human_review", // Default to human review for safety })
// Auto-approve path autoApproveNode := petalflow.NewTransformNode("auto_approve", petalflow.TransformNodeConfig{ Transform: func(inputs map[string]any) (any, error) { return map[string]any{ "decision": "approved", "decision_source": "auto", }, nil }, OutputKey: "routing_decision", })
// Auto-reject path autoRejectNode := petalflow.NewTransformNode("auto_reject", petalflow.TransformNodeConfig{ Transform: func(inputs map[string]any) (any, error) { moderation := inputs["moderation_result"].(ModerationResult) return map[string]any{ "decision": "rejected", "decision_source": "auto", "reason": fmt.Sprintf("Auto-rejected: %s", moderation.Reasoning), }, nil }, InputKeys: []string{"moderation_result"}, OutputKey: "routing_decision", })
// Human review path humanReviewNode := petalflow.NewHumanNode("human_review", petalflow.HumanNodeConfig{ ReviewKey: "content", ApprovalKey: "human_decision", Timeout: 24 * time.Hour, OnTimeout: petalflow.TimeoutActionRoute, TimeoutRoute: "timeout_handler", Callback: func(ctx context.Context, env *petalflow.Envelope) (petalflow.ApprovalResult, error) { content := env.GetVar("content").(Content) moderation := env.GetVar("moderation_result").(ModerationResult)
// Submit to review queue and wait decisionCh := reviewQueue.SubmitForReview(content.ID, content, moderation)
select { case decision := <-decisionCh: return petalflow.ApprovalResult{ Approved: decision.Approved, Reviewer: decision.ReviewerID, Notes: decision.Notes, Data: map[string]any{ "action": decision.Action, "reviewed_at": decision.ReviewedAt, }, }, nil case <-ctx.Done(): return petalflow.ApprovalResult{}, ctx.Err() } }, })
// Process human decision processHumanDecision := petalflow.NewTransformNode("process_human_decision", petalflow.TransformNodeConfig{ Transform: func(inputs map[string]any) (any, error) { decision := inputs["human_decision"].(petalflow.ApprovalResult) return map[string]any{ "decision": boolToDecision(decision.Approved), "decision_source": "human", "reviewer_id": decision.Reviewer, "reviewer_notes": decision.Notes, }, nil }, InputKeys: []string{"human_decision"}, OutputKey: "routing_decision", })
// Timeout handler timeoutHandler := petalflow.NewTransformNode("timeout_handler", petalflow.TransformNodeConfig{ Transform: func(inputs map[string]any) (any, error) { moderation := inputs["moderation_result"].(ModerationResult) // Conservative approach: reject on timeout for risky content decision := "approved" if moderation.RiskScore > 0.5 { decision = "rejected" } return map[string]any{ "decision": decision, "decision_source": "timeout", "reason": "Review timed out, applied default policy", }, nil }, InputKeys: []string{"moderation_result"}, OutputKey: "routing_decision", })
// Stage 4: Finalize Decision finalizeNode := petalflow.NewTransformNode("finalize_decision", petalflow.TransformNodeConfig{ Transform: func(inputs map[string]any) (any, error) { content := inputs["content"].(Content) moderation := inputs["moderation_result"].(ModerationResult) routingDecision := inputs["routing_decision"].(map[string]any)
outcome := ModerationOutcome{ ContentID: content.ID, Decision: routingDecision["decision"].(string), DecisionSource: routingDecision["decision_source"].(string), AIModeration: moderation, ProcessedAt: time.Now().UTC(), }
// Add human review details if present if routingDecision["decision_source"] == "human" { outcome.HumanReview = &ReviewDecision{ Approved: routingDecision["decision"] == "approved", ReviewerID: routingDecision["reviewer_id"].(string), Notes: routingDecision["reviewer_notes"].(string), ReviewedAt: time.Now().UTC(), } }
return outcome, nil }, InputKeys: []string{"content", "moderation_result", "routing_decision"}, OutputKey: "moderation_outcome", })
// Add all nodes g.AddNode(validateNode) g.AddNode(moderateNode) g.AddNode(parseModeration) g.AddNode(riskRouter) g.AddNode(autoApproveNode) g.AddNode(autoRejectNode) g.AddNode(humanReviewNode) g.AddNode(processHumanDecision) g.AddNode(timeoutHandler) g.AddNode(finalizeNode)
// Define edges g.AddEdge("validate_content", "ai_moderate") g.AddEdge("ai_moderate", "parse_moderation") g.AddEdge("parse_moderation", "risk_router") g.AddEdge("risk_router", "auto_approve") g.AddEdge("risk_router", "auto_reject") g.AddEdge("risk_router", "human_review") g.AddEdge("auto_approve", "finalize_decision") g.AddEdge("auto_reject", "finalize_decision") g.AddEdge("human_review", "process_human_decision") g.AddEdge("human_review", "timeout_handler") g.AddEdge("process_human_decision", "finalize_decision") g.AddEdge("timeout_handler", "finalize_decision")
g.SetEntry("validate_content")
return g}
func boolToDecision(approved bool) string { if approved { return "approved" } return "rejected"}Running the Workflow
Section titled “Running the Workflow”func runModerationWorkflow(graph petalflow.Graph, reviewQueue *ReviewQueue) { runtime := petalflow.NewRuntime()
// Sample content requiring review content := Content{ ID: "content_001", UserID: "user_123", Type: "post", Text: "Check out this amazing deal! Click here to win a free iPhone! Limited time only!!!", SubmittedAt: time.Now().UTC(), }
env := petalflow.NewEnvelope() env.SetVar("content", content)
// Run in goroutine since human review will block go func() { handler := func(event petalflow.Event) { switch event.Kind { case petalflow.EventNodeEnd: log.Printf("✓ %s: %v", event.NodeID, event.Duration) case petalflow.EventHumanPending: log.Printf("⏳ Waiting for human review...") case petalflow.EventRouteDecision: log.Printf("→ Routed to: %s", event.Data["target"]) } }
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel()
result, err := runtime.Run(ctx, graph, env, petalflow.RunOptions{ EventHandler: handler, }) if err != nil { log.Fatalf("Moderation failed: %v", err) }
outcome := result.GetVar("moderation_outcome").(ModerationOutcome) output, _ := json.MarshalIndent(outcome, "", " ") fmt.Printf("\n=== Moderation Outcome ===\n%s\n", output) }()
// Simulate human reviewer after 2 seconds time.Sleep(2 * time.Second)
log.Println("📝 Simulating human review decision...") reviewQueue.RecordDecision("content_001", ReviewDecision{ Approved: false, ReviewerID: "moderator_42", Notes: "Appears to be spam/scam content. Reject.", Action: "reject", ReviewedAt: time.Now().UTC(), })
// Wait for workflow to complete time.Sleep(1 * time.Second)}Example Output
Section titled “Example Output”✓ validate_content: 1ms✓ ai_moderate: 1.2s✓ parse_moderation: 1ms→ Routed to: human_review⏳ Waiting for human review...📋 Content content_001 submitted for human review Risk Score: 0.65 Categories: [spam] Reasoning: Content shows characteristics of promotional spam📝 Simulating human review decision...✓ human_review: 2.1s✓ process_human_decision: 1ms✓ finalize_decision: 1ms
=== Moderation Outcome ==={ "content_id": "content_001", "decision": "rejected", "decision_source": "human", "ai_moderation": { "safe": false, "risk_score": 0.65, "categories": ["spam"], "confidence": 0.85, "reasoning": "Content shows characteristics of promotional spam", "requires_review": true }, "human_review": { "approved": false, "reviewer_id": "moderator_42", "notes": "Appears to be spam/scam content. Reject.", "reviewed_at": "2024-01-15T10:30:02Z", "action": "reject" }, "processed_at": "2024-01-15T10:30:02Z"}Variations
Section titled “Variations”Escalation Chain
Section titled “Escalation Chain”Add multi-level approval for sensitive content:
// First-level reviewlevel1Review := petalflow.NewHumanNode("level1_review", petalflow.HumanNodeConfig{ ReviewKey: "content", Timeout: 1 * time.Hour, Callback: level1ReviewCallback,})
// Escalation routerescalationRouter := petalflow.NewRuleRouter("check_escalation", petalflow.RuleRouterConfig{ Routes: []petalflow.RouteRule{ // Escalate to senior reviewer if L1 is uncertain { When: petalflow.RouteCondition{ Var: "level1_decision.action", Op: petalflow.OpEquals, Value: "escalate", }, To: "level2_review", }, }, Default: "finalize_decision",})
// Senior reviewerlevel2Review := petalflow.NewHumanNode("level2_review", petalflow.HumanNodeConfig{ ReviewKey: "content", Timeout: 4 * time.Hour, Callback: level2ReviewCallback,})Slack Integration
Section titled “Slack Integration”Send review requests to Slack:
func slackReviewCallback(ctx context.Context, env *petalflow.Envelope) (petalflow.ApprovalResult, error) { content := env.GetVar("content").(Content) moderation := env.GetVar("moderation_result").(ModerationResult)
// Post to Slack with interactive buttons msg := slack.Message{ Channel: "#content-review", Blocks: []slack.Block{ slack.Header("Content Review Required"), slack.Section(fmt.Sprintf("Risk Score: %.2f", moderation.RiskScore)), slack.Section(fmt.Sprintf("Categories: %v", moderation.Categories)), slack.Section(fmt.Sprintf("Content: %s", truncate(content.Text, 200))), slack.Actions( slack.Button("approve", "Approve", "approve_"+content.ID), slack.Button("reject", "Reject", "reject_"+content.ID), slack.Button("escalate", "Escalate", "escalate_"+content.ID), ), }, }
slackClient.PostMessage(msg)
// Wait for Slack interaction decision := <-slackInteractionChannel return petalflow.ApprovalResult{ Approved: decision.Action == "approve", Reviewer: decision.UserID, }, nil}Batch Review Interface
Section titled “Batch Review Interface”Process multiple items in a single review session:
// Collect items for batch reviewbatchNode := petalflow.NewTransformNode("batch_items", petalflow.TransformNodeConfig{ Transform: func(inputs map[string]any) (any, error) { // Collect pending items with similar risk profiles items := getPendingItems(10) return map[string]any{ "batch_id": uuid.New().String(), "items": items, "item_count": len(items), }, nil },})
// Human reviews entire batchbatchReview := petalflow.NewHumanNode("batch_review", petalflow.HumanNodeConfig{ ReviewKey: "batch_items", Timeout: 2 * time.Hour, Callback: batchReviewCallback,})