Skip to content

Human Review Loop

This example builds a content moderation system with human-in-the-loop review for flagged content, demonstrating approval workflows, escalation, and timeout handling.

┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Receive │────▶│ AI │────▶│ Risk │
│ Content │ │ Moderate │ │ Router │
└─────────────┘ └─────────────┘ └──────┬──────┘
┌──────────────────────────┼──────────────────────────┐
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Auto │ │ Human │ │ Auto │
│ Approve │ │ Review │ │ Reject │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
│ ┌─────┴─────┐ │
│ ▼ ▼ │
│ ┌──────────┐ ┌──────────┐ │
│ │ Approved │ │ Rejected │ │
│ └────┬─────┘ └────┬─────┘ │
│ │ │ │
└──────────────────┴────────────┴─────────────────────┘
┌──────▼──────┐
│ Finalize │
│ Decision │
└─────────────┘
  • Content moderation: Review user-generated content before publishing
  • Financial approvals: Require human sign-off for large transactions
  • Document review: Manual review of AI-generated documents
  • Compliance checks: Human verification for regulated workflows
  • Quality assurance: Review AI outputs before customer delivery
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"sync"
"time"
"github.com/petal-labs/iris/providers/openai"
"github.com/petal-labs/petalflow"
"github.com/petal-labs/petalflow/irisadapter"
)
// Content represents user-submitted content
type Content struct {
ID string `json:"id"`
UserID string `json:"user_id"`
Type string `json:"type"` // post, comment, image, etc.
Text string `json:"text"`
ImageURL string `json:"image_url,omitempty"`
SubmittedAt time.Time `json:"submitted_at"`
}
// ModerationResult contains AI moderation output
type ModerationResult struct {
Safe bool `json:"safe"`
RiskScore float64 `json:"risk_score"` // 0-1
Categories []string `json:"categories"` // violence, hate, spam, etc.
Confidence float64 `json:"confidence"`
Reasoning string `json:"reasoning"`
RequiresReview bool `json:"requires_review"`
}
// ReviewDecision from human reviewer
type ReviewDecision struct {
Approved bool `json:"approved"`
ReviewerID string `json:"reviewer_id"`
Notes string `json:"notes"`
ReviewedAt time.Time `json:"reviewed_at"`
Action string `json:"action"` // approve, reject, escalate
}
// Final moderation outcome
type ModerationOutcome struct {
ContentID string `json:"content_id"`
Decision string `json:"decision"` // approved, rejected
DecisionSource string `json:"decision_source"` // auto, human
AIModeration ModerationResult `json:"ai_moderation"`
HumanReview *ReviewDecision `json:"human_review,omitempty"`
ProcessedAt time.Time `json:"processed_at"`
}
func main() {
provider := openai.New(os.Getenv("OPENAI_API_KEY"))
client := irisadapter.NewProviderAdapter(provider)
// Create review queue (in production, use Redis/database)
reviewQueue := NewReviewQueue()
graph := buildModerationGraph(client, reviewQueue)
runModerationWorkflow(graph, reviewQueue)
}
// ReviewQueue manages pending human reviews
type ReviewQueue struct {
pending map[string]chan ReviewDecision
mu sync.RWMutex
}
func NewReviewQueue() *ReviewQueue {
return &ReviewQueue{
pending: make(map[string]chan ReviewDecision),
}
}
func (q *ReviewQueue) SubmitForReview(contentID string, content Content, moderation ModerationResult) <-chan ReviewDecision {
q.mu.Lock()
defer q.mu.Unlock()
ch := make(chan ReviewDecision, 1)
q.pending[contentID] = ch
// In production: send to review UI, Slack, email, etc.
log.Printf("📋 Content %s submitted for human review", contentID)
log.Printf(" Risk Score: %.2f", moderation.RiskScore)
log.Printf(" Categories: %v", moderation.Categories)
log.Printf(" Reasoning: %s", moderation.Reasoning)
return ch
}
func (q *ReviewQueue) RecordDecision(contentID string, decision ReviewDecision) error {
q.mu.Lock()
defer q.mu.Unlock()
ch, exists := q.pending[contentID]
if !exists {
return fmt.Errorf("content %s not found in review queue", contentID)
}
ch <- decision
close(ch)
delete(q.pending, contentID)
return nil
}
func (q *ReviewQueue) GetPendingCount() int {
q.mu.RLock()
defer q.mu.RUnlock()
return len(q.pending)
}
func buildModerationGraph(client *irisadapter.ProviderAdapter, reviewQueue *ReviewQueue) petalflow.Graph {
g := petalflow.NewGraph("content-moderation")
// Stage 1: Validate Content
validateNode := petalflow.NewGuardianNode("validate_content", petalflow.GuardianNodeConfig{
Checks: []petalflow.GuardCheck{
{Var: "content.id", Op: petalflow.OpNotEmpty, Message: "Content ID required"},
{Var: "content.user_id", Op: petalflow.OpNotEmpty, Message: "User ID required"},
{Var: "content.text", Op: petalflow.OpMaxLength, Value: 50000, Message: "Content too long"},
},
OnFail: petalflow.GuardActionReject,
})
// Stage 2: AI Moderation
moderateNode := petalflow.NewLLMNode("ai_moderate", client, petalflow.LLMNodeConfig{
Model: "gpt-4o",
SystemPrompt: `You are a content moderation system. Analyze the content for policy violations.
Categories to check:
- violence: Threats, graphic violence, self-harm
- hate: Hate speech, discrimination, harassment
- sexual: Explicit sexual content, nudity
- spam: Advertising, scams, repetitive content
- misinformation: False claims, dangerous advice
- pii: Personal information exposure
Return JSON:
{
"safe": true/false,
"risk_score": 0.0-1.0,
"categories": ["category1", "category2"],
"confidence": 0.0-1.0,
"reasoning": "Brief explanation",
"requires_review": true/false (set true if confidence < 0.8 or risk_score between 0.3-0.7)
}`,
PromptTemplate: `Content to moderate:
Type: {{.Vars.content.Type}}
Text: {{.Vars.content.Text}}
{{if .Vars.content.ImageURL}}Image URL: {{.Vars.content.ImageURL}}{{end}}
Analyze this content for policy violations.`,
OutputKey: "moderation_raw",
ResponseFormat: petalflow.ResponseFormatJSON,
Temperature: 0.1,
})
// Parse moderation result
parseModeration := petalflow.NewTransformNode("parse_moderation", petalflow.TransformNodeConfig{
Transform: func(inputs map[string]any) (any, error) {
raw := inputs["moderation_raw"].(string)
var result ModerationResult
if err := json.Unmarshal([]byte(raw), &result); err != nil {
return ModerationResult{
Safe: false,
RiskScore: 1.0,
RequiresReview: true,
Reasoning: "Failed to parse moderation result",
}, nil
}
return result, nil
},
InputKeys: []string{"moderation_raw"},
OutputKey: "moderation_result",
})
// Stage 3: Risk-Based Routing
riskRouter := petalflow.NewRuleRouter("risk_router", petalflow.RuleRouterConfig{
Routes: []petalflow.RouteRule{
// Auto-reject high-risk content
{
When: petalflow.RouteCondition{
And: []petalflow.RouteCondition{
{Var: "moderation_result.risk_score", Op: petalflow.OpGte, Value: 0.8},
{Var: "moderation_result.confidence", Op: petalflow.OpGte, Value: 0.9},
},
},
To: "auto_reject",
},
// Auto-approve low-risk content
{
When: petalflow.RouteCondition{
And: []petalflow.RouteCondition{
{Var: "moderation_result.risk_score", Op: petalflow.OpLt, Value: 0.2},
{Var: "moderation_result.safe", Op: petalflow.OpEquals, Value: true},
{Var: "moderation_result.confidence", Op: petalflow.OpGte, Value: 0.9},
},
},
To: "auto_approve",
},
// Everything else requires human review
{
When: petalflow.RouteCondition{
Var: "moderation_result.requires_review",
Op: petalflow.OpEquals,
Value: true,
},
To: "human_review",
},
},
Default: "human_review", // Default to human review for safety
})
// Auto-approve path
autoApproveNode := petalflow.NewTransformNode("auto_approve", petalflow.TransformNodeConfig{
Transform: func(inputs map[string]any) (any, error) {
return map[string]any{
"decision": "approved",
"decision_source": "auto",
}, nil
},
OutputKey: "routing_decision",
})
// Auto-reject path
autoRejectNode := petalflow.NewTransformNode("auto_reject", petalflow.TransformNodeConfig{
Transform: func(inputs map[string]any) (any, error) {
moderation := inputs["moderation_result"].(ModerationResult)
return map[string]any{
"decision": "rejected",
"decision_source": "auto",
"reason": fmt.Sprintf("Auto-rejected: %s", moderation.Reasoning),
}, nil
},
InputKeys: []string{"moderation_result"},
OutputKey: "routing_decision",
})
// Human review path
humanReviewNode := petalflow.NewHumanNode("human_review", petalflow.HumanNodeConfig{
ReviewKey: "content",
ApprovalKey: "human_decision",
Timeout: 24 * time.Hour,
OnTimeout: petalflow.TimeoutActionRoute,
TimeoutRoute: "timeout_handler",
Callback: func(ctx context.Context, env *petalflow.Envelope) (petalflow.ApprovalResult, error) {
content := env.GetVar("content").(Content)
moderation := env.GetVar("moderation_result").(ModerationResult)
// Submit to review queue and wait
decisionCh := reviewQueue.SubmitForReview(content.ID, content, moderation)
select {
case decision := <-decisionCh:
return petalflow.ApprovalResult{
Approved: decision.Approved,
Reviewer: decision.ReviewerID,
Notes: decision.Notes,
Data: map[string]any{
"action": decision.Action,
"reviewed_at": decision.ReviewedAt,
},
}, nil
case <-ctx.Done():
return petalflow.ApprovalResult{}, ctx.Err()
}
},
})
// Process human decision
processHumanDecision := petalflow.NewTransformNode("process_human_decision", petalflow.TransformNodeConfig{
Transform: func(inputs map[string]any) (any, error) {
decision := inputs["human_decision"].(petalflow.ApprovalResult)
return map[string]any{
"decision": boolToDecision(decision.Approved),
"decision_source": "human",
"reviewer_id": decision.Reviewer,
"reviewer_notes": decision.Notes,
}, nil
},
InputKeys: []string{"human_decision"},
OutputKey: "routing_decision",
})
// Timeout handler
timeoutHandler := petalflow.NewTransformNode("timeout_handler", petalflow.TransformNodeConfig{
Transform: func(inputs map[string]any) (any, error) {
moderation := inputs["moderation_result"].(ModerationResult)
// Conservative approach: reject on timeout for risky content
decision := "approved"
if moderation.RiskScore > 0.5 {
decision = "rejected"
}
return map[string]any{
"decision": decision,
"decision_source": "timeout",
"reason": "Review timed out, applied default policy",
}, nil
},
InputKeys: []string{"moderation_result"},
OutputKey: "routing_decision",
})
// Stage 4: Finalize Decision
finalizeNode := petalflow.NewTransformNode("finalize_decision", petalflow.TransformNodeConfig{
Transform: func(inputs map[string]any) (any, error) {
content := inputs["content"].(Content)
moderation := inputs["moderation_result"].(ModerationResult)
routingDecision := inputs["routing_decision"].(map[string]any)
outcome := ModerationOutcome{
ContentID: content.ID,
Decision: routingDecision["decision"].(string),
DecisionSource: routingDecision["decision_source"].(string),
AIModeration: moderation,
ProcessedAt: time.Now().UTC(),
}
// Add human review details if present
if routingDecision["decision_source"] == "human" {
outcome.HumanReview = &ReviewDecision{
Approved: routingDecision["decision"] == "approved",
ReviewerID: routingDecision["reviewer_id"].(string),
Notes: routingDecision["reviewer_notes"].(string),
ReviewedAt: time.Now().UTC(),
}
}
return outcome, nil
},
InputKeys: []string{"content", "moderation_result", "routing_decision"},
OutputKey: "moderation_outcome",
})
// Add all nodes
g.AddNode(validateNode)
g.AddNode(moderateNode)
g.AddNode(parseModeration)
g.AddNode(riskRouter)
g.AddNode(autoApproveNode)
g.AddNode(autoRejectNode)
g.AddNode(humanReviewNode)
g.AddNode(processHumanDecision)
g.AddNode(timeoutHandler)
g.AddNode(finalizeNode)
// Define edges
g.AddEdge("validate_content", "ai_moderate")
g.AddEdge("ai_moderate", "parse_moderation")
g.AddEdge("parse_moderation", "risk_router")
g.AddEdge("risk_router", "auto_approve")
g.AddEdge("risk_router", "auto_reject")
g.AddEdge("risk_router", "human_review")
g.AddEdge("auto_approve", "finalize_decision")
g.AddEdge("auto_reject", "finalize_decision")
g.AddEdge("human_review", "process_human_decision")
g.AddEdge("human_review", "timeout_handler")
g.AddEdge("process_human_decision", "finalize_decision")
g.AddEdge("timeout_handler", "finalize_decision")
g.SetEntry("validate_content")
return g
}
func boolToDecision(approved bool) string {
if approved {
return "approved"
}
return "rejected"
}
func runModerationWorkflow(graph petalflow.Graph, reviewQueue *ReviewQueue) {
runtime := petalflow.NewRuntime()
// Sample content requiring review
content := Content{
ID: "content_001",
UserID: "user_123",
Type: "post",
Text: "Check out this amazing deal! Click here to win a free iPhone! Limited time only!!!",
SubmittedAt: time.Now().UTC(),
}
env := petalflow.NewEnvelope()
env.SetVar("content", content)
// Run in goroutine since human review will block
go func() {
handler := func(event petalflow.Event) {
switch event.Kind {
case petalflow.EventNodeEnd:
log.Printf("%s: %v", event.NodeID, event.Duration)
case petalflow.EventHumanPending:
log.Printf("⏳ Waiting for human review...")
case petalflow.EventRouteDecision:
log.Printf("→ Routed to: %s", event.Data["target"])
}
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
result, err := runtime.Run(ctx, graph, env, petalflow.RunOptions{
EventHandler: handler,
})
if err != nil {
log.Fatalf("Moderation failed: %v", err)
}
outcome := result.GetVar("moderation_outcome").(ModerationOutcome)
output, _ := json.MarshalIndent(outcome, "", " ")
fmt.Printf("\n=== Moderation Outcome ===\n%s\n", output)
}()
// Simulate human reviewer after 2 seconds
time.Sleep(2 * time.Second)
log.Println("📝 Simulating human review decision...")
reviewQueue.RecordDecision("content_001", ReviewDecision{
Approved: false,
ReviewerID: "moderator_42",
Notes: "Appears to be spam/scam content. Reject.",
Action: "reject",
ReviewedAt: time.Now().UTC(),
})
// Wait for workflow to complete
time.Sleep(1 * time.Second)
}
✓ validate_content: 1ms
✓ ai_moderate: 1.2s
✓ parse_moderation: 1ms
→ Routed to: human_review
⏳ Waiting for human review...
📋 Content content_001 submitted for human review
Risk Score: 0.65
Categories: [spam]
Reasoning: Content shows characteristics of promotional spam
📝 Simulating human review decision...
✓ human_review: 2.1s
✓ process_human_decision: 1ms
✓ finalize_decision: 1ms
=== Moderation Outcome ===
{
"content_id": "content_001",
"decision": "rejected",
"decision_source": "human",
"ai_moderation": {
"safe": false,
"risk_score": 0.65,
"categories": ["spam"],
"confidence": 0.85,
"reasoning": "Content shows characteristics of promotional spam",
"requires_review": true
},
"human_review": {
"approved": false,
"reviewer_id": "moderator_42",
"notes": "Appears to be spam/scam content. Reject.",
"reviewed_at": "2024-01-15T10:30:02Z",
"action": "reject"
},
"processed_at": "2024-01-15T10:30:02Z"
}

Add multi-level approval for sensitive content:

// First-level review
level1Review := petalflow.NewHumanNode("level1_review", petalflow.HumanNodeConfig{
ReviewKey: "content",
Timeout: 1 * time.Hour,
Callback: level1ReviewCallback,
})
// Escalation router
escalationRouter := petalflow.NewRuleRouter("check_escalation", petalflow.RuleRouterConfig{
Routes: []petalflow.RouteRule{
// Escalate to senior reviewer if L1 is uncertain
{
When: petalflow.RouteCondition{
Var: "level1_decision.action",
Op: petalflow.OpEquals,
Value: "escalate",
},
To: "level2_review",
},
},
Default: "finalize_decision",
})
// Senior reviewer
level2Review := petalflow.NewHumanNode("level2_review", petalflow.HumanNodeConfig{
ReviewKey: "content",
Timeout: 4 * time.Hour,
Callback: level2ReviewCallback,
})

Send review requests to Slack:

func slackReviewCallback(ctx context.Context, env *petalflow.Envelope) (petalflow.ApprovalResult, error) {
content := env.GetVar("content").(Content)
moderation := env.GetVar("moderation_result").(ModerationResult)
// Post to Slack with interactive buttons
msg := slack.Message{
Channel: "#content-review",
Blocks: []slack.Block{
slack.Header("Content Review Required"),
slack.Section(fmt.Sprintf("Risk Score: %.2f", moderation.RiskScore)),
slack.Section(fmt.Sprintf("Categories: %v", moderation.Categories)),
slack.Section(fmt.Sprintf("Content: %s", truncate(content.Text, 200))),
slack.Actions(
slack.Button("approve", "Approve", "approve_"+content.ID),
slack.Button("reject", "Reject", "reject_"+content.ID),
slack.Button("escalate", "Escalate", "escalate_"+content.ID),
),
},
}
slackClient.PostMessage(msg)
// Wait for Slack interaction
decision := <-slackInteractionChannel
return petalflow.ApprovalResult{
Approved: decision.Action == "approve",
Reviewer: decision.UserID,
}, nil
}

Process multiple items in a single review session:

// Collect items for batch review
batchNode := petalflow.NewTransformNode("batch_items", petalflow.TransformNodeConfig{
Transform: func(inputs map[string]any) (any, error) {
// Collect pending items with similar risk profiles
items := getPendingItems(10)
return map[string]any{
"batch_id": uuid.New().String(),
"items": items,
"item_count": len(items),
}, nil
},
})
// Human reviews entire batch
batchReview := petalflow.NewHumanNode("batch_review", petalflow.HumanNodeConfig{
ReviewKey: "batch_items",
Timeout: 2 * time.Hour,
Callback: batchReviewCallback,
})